From 340219020bc00f948d1ba931323acaa1b331974c Mon Sep 17 00:00:00 2001 From: bogdan-rosianu <51945539+bogdan-rosianu@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:21:03 +0300 Subject: [PATCH] Refactoring: move classes, docs, delay between requests feature (#4) * refactoring * version bump * fix test * bugfix * fix missing timestamp clause * e2e tests with chainsimulator (#5) * first PoC of chain simulator e2e test * fixes, esdt test, workflow * merge * fix hosts * new host * chainsimulator hostname * custom network * sleep * remove custom network * update hostnames again * update workflow name * refactoring * add missing eol --- .github/workflows/chainsimulator-tests.yml | 38 ++++ README.md | 4 +- .../chain.simulator.operations.ts | 150 ++++++++++++++ .../ping-pong/ping-pong-egld.abi.json | 126 +++++++++++ .../contracts/ping-pong/ping-pong-egld.wasm | Bin 0 -> 1349 bytes .../chain-simulator-e2e/docker-compose.yml | 37 ++++ .../event.processor.e2e.spec.ts | 195 ++++++++++++++++++ .../overridable-config.toml | 8 + example/example.ts | 1 + package-lock.json | 4 +- package.json | 5 +- src/event.processor.ts | 76 +------ src/test/event.processor.spec.ts | 45 +++- src/types/elastic.event.source.ts | 13 ++ src/types/event.processor.options.ts | 82 ++++++++ src/utils/elastic.helpers.ts | 53 +++++ 16 files changed, 757 insertions(+), 80 deletions(-) create mode 100644 .github/workflows/chainsimulator-tests.yml create mode 100644 e2e-tests/chain-simulator-e2e/chain.simulator.operations.ts create mode 100644 e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.abi.json create mode 100755 e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.wasm create mode 100644 e2e-tests/chain-simulator-e2e/docker-compose.yml create mode 100644 e2e-tests/chain-simulator-e2e/event.processor.e2e.spec.ts create mode 100644 e2e-tests/chain-simulator-e2e/overridable-config.toml create mode 100644 src/types/elastic.event.source.ts create mode 100644 src/types/event.processor.options.ts create mode 100644 src/utils/elastic.helpers.ts diff --git a/.github/workflows/chainsimulator-tests.yml b/.github/workflows/chainsimulator-tests.yml new file mode 100644 index 0000000..3de8952 --- /dev/null +++ b/.github/workflows/chainsimulator-tests.yml @@ -0,0 +1,38 @@ +name: Chain simulator e2e tests workflow + +on: + pull_request: + +jobs: + test-chainsimulator-e2e: + runs-on: ubuntu-latest + strategy: + matrix: + node-version: [18.x] + steps: + - uses: actions/checkout@v3 + + - name: Build and start the Docker images + run: docker compose -f "e2e-tests/chain-simulator-e2e/docker-compose.yml" up -d --build + + - name: Wait for services to be ready + run: | + echo "Waiting for services to be healthy..." + docker ps + docker logs chainsimulator + sleep 20 # Wait for 20 seconds to ensure services are up + + - name: Print docker containers + run: docker ps + + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v3 + with: + node-version: ${{ matrix.node-version }} + cache: 'npm' + + - name: Install dependencies + run: npm install + + - name: Run e2e tests + run: npm run test:e2e diff --git a/README.md b/README.md index 5224756..f6c0a71 100644 --- a/README.md +++ b/README.md @@ -9,13 +9,15 @@ Event processor for JavaScript and TypeScript (written in TypeScript). ## Usage ```js -let eventProcessor = new EventProcessor(); +let lastProcessedTimestamp = 1727858320; +const eventProcessor = new EventProcessor(); await eventProcessor.start({ elasticUrl: 'https://index.multiversx.com', eventIdentifiers: ['swapTokensFixedInput'], emitterAddresses: ['erd1qqqqqqqqqqqqqpgqt0uek344kaerr4gf9g2r8l0f4l8ygyha2jps82u9r6'], pageSize: 1000, scrollTimeout: "1m", + delayBetweenRequestsInMilliseconds: 100, getLastProcessedTimestamp: async () => { return lastProcessedTimestamp; }, diff --git a/e2e-tests/chain-simulator-e2e/chain.simulator.operations.ts b/e2e-tests/chain-simulator-e2e/chain.simulator.operations.ts new file mode 100644 index 0000000..2bc4eb9 --- /dev/null +++ b/e2e-tests/chain-simulator-e2e/chain.simulator.operations.ts @@ -0,0 +1,150 @@ +import axios from "axios"; + +const VM_TYPE = "0500"; +const CODE_METADATA = "0100"; +const SC_DEPLOY_ADDRESS = 'erd1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq6gq4hu'; +const ESDT_ADDRESS = 'erd1qqqqqqqqqqqqqqqpqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzllls8a5w6u'; + +export async function fundAddress(chainSimulatorUrl: string, address: string) { + const payload = [ + { + address: address, + balance: '100000000000000000000000', + }, + ]; + await axios.post(`${chainSimulatorUrl}/simulator/set-state`, payload); +} + +export async function getNonce(chainSimulatorUrl: string, address: string): Promise { + try { + const currentNonceResponse = await axios.get(`${chainSimulatorUrl}/address/${address}/nonce`); + return currentNonceResponse.data.data.nonce; + } catch (e) { + console.error(e); + return 0; + } +} + +export async function deploySc(args: DeployScArgs): Promise { + try { + const contractCodeHex = Buffer.from(args.contractCodeRaw).toString('hex'); + const contractArgs = [VM_TYPE, CODE_METADATA, ...args.hexArguments]; + const contractPayload = contractCodeHex + '@' + contractArgs.join('@'); + + const txHash = await sendTransaction(new SendTransactionArgs({ + chainSimulatorUrl: args.chainSimulatorUrl, + sender: args.deployer, + receiver: SC_DEPLOY_ADDRESS, + dataField: contractPayload, + })); + + const txResponse = await axios.get(`${args.chainSimulatorUrl}/transaction/${txHash}?withResults=true`); + const scDeployLog = txResponse?.data?.data?.transaction?.logs?.events?.find((event: { identifier: string; }) => event.identifier === 'SCDeploy'); + console.log(`Deployed SC. tx hash: ${txHash}. address: ${scDeployLog?.address}`); + return scDeployLog?.address; + } catch (e) { + console.error(e); + return 'n/a'; + } +} + +export async function issueEsdt(args: IssueEsdtArgs) { + const txHash = await sendTransaction(new SendTransactionArgs({ + chainSimulatorUrl: args.chainSimulatorUrl, + sender: args.issuer, + receiver: ESDT_ADDRESS, + dataField: `issue@${Buffer.from(args.tokenName).toString('hex')}@${Buffer.from(args.tokenTicker).toString('hex')}@1e9b0e04e39e5845000000@12`, + value: '50000000000000000', + })); + + const txResponse = await axios.get(`${args.chainSimulatorUrl}/transaction/${txHash}?withResults=true`); + const esdtIssueLog = txResponse?.data?.data?.transaction?.logs?.events?.find((event: { identifier: string; }) => event.identifier === 'issue'); + const tokenIdentifier = Buffer.from(esdtIssueLog.topics[0], 'base64').toString(); + console.log(`Issued token with ticker ${args.tokenTicker}. tx hash: ${txHash}. identifier: ${tokenIdentifier}`); + return tokenIdentifier; +} + +export async function transferEsdt(args: TransferEsdtArgs) { + const transferValue = args.plainAmountOfTokens * (10 ** 18); + return await sendTransaction(new SendTransactionArgs({ + chainSimulatorUrl: args.chainSimulatorUrl, + sender: args.sender, + receiver: args.receiver, + dataField: `ESDTTransfer@${Buffer.from(args.tokenIdentifier).toString('hex')}@${transferValue.toString(16)}`, + value: '0', + })); +} + +export async function sendTransaction(args: SendTransactionArgs): Promise { + try { + const nonce = await getNonce(args.chainSimulatorUrl, args.sender); + + const tx = { + sender: args.sender, + receiver: args.receiver, + nonce: nonce, + value: args.value, + gasPrice: 1000000000, + gasLimit: args.gasLimit, + data: Buffer.from(args.dataField).toString('base64'), + signature: 'a'.repeat(128), + chainID: 'chain', + version: 1, + }; + + const txHashResponse = await axios.post(`${args.chainSimulatorUrl}/transaction/send`, tx); + const txHash = txHashResponse.data.data.txHash; + await axios.post(`${args.chainSimulatorUrl}/simulator/generate-blocks-until-transaction-processed/${txHash}`); + return txHash; + } catch (e) { + console.error(e); + return 'n/a'; + } +} + +export class SendTransactionArgs { + chainSimulatorUrl: string = ''; + sender: string = ''; + receiver: string = ''; + dataField: string = ''; + value?: string = '0'; + gasLimit?: number = 100_000_000; + + constructor(options: Partial = {}) { + Object.assign(this, options); + } +} + +export class IssueEsdtArgs { + chainSimulatorUrl: string = ''; + issuer: string = ''; + tokenName: string = ''; + tokenTicker: string = ''; + + constructor(options: Partial = {}) { + Object.assign(this, options); + } +} + +export class TransferEsdtArgs { + chainSimulatorUrl: string = ''; + sender: string = ''; + receiver: string = ''; + tokenIdentifier: string = ''; + plainAmountOfTokens: number = 1; + + constructor(options: Partial = {}) { + Object.assign(this, options); + } +} + +export class DeployScArgs { + chainSimulatorUrl: string = ''; + deployer: string = ''; + contractCodeRaw: Buffer = Buffer.from(''); + hexArguments: string[] = []; + + constructor(options: Partial = {}) { + Object.assign(this, options); + } +} diff --git a/e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.abi.json b/e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.abi.json new file mode 100644 index 0000000..5419a3e --- /dev/null +++ b/e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.abi.json @@ -0,0 +1,126 @@ +{ + "buildInfo": { + "rustc": { + "version": "1.81.0", + "commitHash": "eeb90cda1969383f56a2637cbd3037bdf598841c", + "commitDate": "2024-09-04", + "channel": "Stable", + "short": "rustc 1.81.0 (eeb90cda1 2024-09-04)" + }, + "contractCrate": { + "name": "ping-pong-egld", + "version": "0.0.2", + "gitVersion": "v0.53.2-4-g190f26fac" + }, + "framework": { + "name": "multiversx-sc", + "version": "0.53.2" + } + }, + "docs": [ + "An example contract that emits logs for ping and pong calls. on init you can configure the egld amount to be called at ping" + ], + "name": "PingPong", + "constructor": { + "docs": [ + "Necessary configuration when deploying:", + "`ping_amount` - the exact EGLD amount that needs to be sent when `ping`-ing." + ], + "inputs": [ + { + "name": "ping_amount", + "type": "BigUint" + } + ], + "outputs": [] + }, + "upgradeConstructor": { + "inputs": [ + { + "name": "ping_amount", + "type": "BigUint" + } + ], + "outputs": [] + }, + "endpoints": [ + { + "docs": [ + "User sends some EGLD to be locked in the contract for a period of time.", + "Optional `_data` argument is ignored." + ], + "name": "ping", + "mutability": "mutable", + "payableInTokens": [ + "EGLD" + ], + "inputs": [ + { + "name": "_data", + "type": "ignore", + "multi_arg": true + } + ], + "outputs": [] + }, + { + "docs": [ + "User can take back funds from the contract.", + "Can only be called after expiration." + ], + "name": "pong", + "mutability": "mutable", + "inputs": [], + "outputs": [] + }, + { + "name": "getPingAmount", + "mutability": "readonly", + "inputs": [], + "outputs": [ + { + "type": "BigUint" + } + ] + } + ], + "events": [ + { + "docs": [ + "Signals a successful ping by user with amount" + ], + "identifier": "", + "inputs": [ + { + "name": "caller", + "type": "Address", + "indexed": true + }, + { + "name": "pinged_amount", + "type": "BigUint" + } + ] + }, + { + "docs": [ + "Signals a successful pong by user with amount" + ], + "identifier": "", + "inputs": [ + { + "name": "caller", + "type": "Address", + "indexed": true + }, + { + "name": "ponged_amount", + "type": "BigUint" + } + ] + } + ], + "esdtAttributes": [], + "hasCallback": false, + "types": {} +} diff --git a/e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.wasm b/e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.wasm new file mode 100755 index 0000000000000000000000000000000000000000..fcf8eecd259400aa10d3f33b808d2e3baac68cca GIT binary patch literal 1349 zcmZuxO>f&q5S`tnELtmBE7_4Fxk*-v0_2iw5cK4AYWD+$5x{PHlxb=85u2n!F75gx z`_OZH=%Kw8D0<2v$Nq?3`WN~~+Rl)4;sPZ=?(UnJH}ht92|*e%0AP=+C%{%Ge6_+a z08QY86xdNnzM71CQhX3#*mZLwi}2)%Z^V1;xgAZpUYb@vGn}SH&se zX6aZaq`?;Mn}y!}qfd^HlORe_WU0CRzEjk{uY>c<7A;&a3@^iFLD4MR{7tfOsAX0( za;xw#!Z^cS?E7OZcP|*9!JwKgXGt)z@US)-O#&B;aGwN| zqtV<3%h73&TET1YyjGpDmD(JDVj5%@7v_f>pWde&3_BaZa}H?P0wpvYc)Lw$e!uoj zR{_XY5z94bro5i!w+KcWAOz$A`aA}s&IWu4nrmk6JN#V#E#4U4B{=mj@)cPwlH@tB zBON#O*Tz#7tCSzZ|L48ca!xQ-!(to_|E_a98H<| z2cN3uJAGDXl1zqu3}7;~AyL-@)m6;~Q|&%i9}NUb+|dH@o>G{bI*0p(C(As^`TxS% zkY+(pM}0p|9WMu%DIkjRba a+95n$d}}8<%|f~l%kuYUnWfk{6A literal 0 HcmV?d00001 diff --git a/e2e-tests/chain-simulator-e2e/docker-compose.yml b/e2e-tests/chain-simulator-e2e/docker-compose.yml new file mode 100644 index 0000000..2948457 --- /dev/null +++ b/e2e-tests/chain-simulator-e2e/docker-compose.yml @@ -0,0 +1,37 @@ +services: + elasticsearch: + ports: + - "9200:9200" + container_name: es-container + image: docker.elastic.co/elasticsearch/elasticsearch:7.16.1 + environment: + - "discovery.type=single-node" + - "xpack.security.enabled=false" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9200" ] + interval: 10s + timeout: 5s + retries: 5 + + chainsimulator: + container_name: chainsimulator + image: multiversx/chainsimulator + command: ["--node-override-config", "./overridable-config.toml"] + volumes: + - ./overridable-config.toml:/multiversx/overridable-config.toml + depends_on: + - elasticsearch + environment: + ELASTIC_SEARCH_URL: 'http://localhost:9200' + ports: + - "8085:8085" + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8085/simulator/observers" ] + interval: 10s + timeout: 5s + retries: 5 diff --git a/e2e-tests/chain-simulator-e2e/event.processor.e2e.spec.ts b/e2e-tests/chain-simulator-e2e/event.processor.e2e.spec.ts new file mode 100644 index 0000000..337a15a --- /dev/null +++ b/e2e-tests/chain-simulator-e2e/event.processor.e2e.spec.ts @@ -0,0 +1,195 @@ +import axios from "axios"; +import { deploySc, sendTransaction, fundAddress, issueEsdt, transferEsdt, IssueEsdtArgs, TransferEsdtArgs, DeployScArgs } from "./chain.simulator.operations"; +import * as fs from "node:fs"; +import { EventProcessor } from "../../src/event.processor"; +import { EventProcessorOptions } from "../../src/types/event.processor.options"; + +const CHAIN_SIMULATOR_URL = 'http://localhost:8085'; +const ELASTIC_SEARCH_URL = 'http://localhost:9200'; +const ALICE_ADDRESS = 'erd1qyu5wthldzr8wx5c9ucg8kjagg0jfs53s8nr3zpz3hypefsdd8ssycr6th'; +const BOB_ADDRESS = 'erd1spyavw0956vq68xj8y4tenjpq2wd5a9p2c6j8gsz7ztyrnpxrruqzu66jx'; +const CAROL_ADDRESS = 'erd1k2s324ww2g0yj38qn2ch2jwctdy8mnfxep94q9arncc6xecg3xaq6mjse8'; +const ONE_EGLD = '1000000000000000000'; +const VERBOSE_LOGS = false; + +describe('EventProcessor e2e tests with chain simulator', () => { + let eventProcessor: EventProcessor; + + beforeAll(async () => { + try { + const response = await axios.get(`${CHAIN_SIMULATOR_URL}/simulator/observers`); + + let numRetries = 0; + while (true) { + if (response.status === 200) { + await axios.post(`${CHAIN_SIMULATOR_URL}/simulator/generate-blocks-until-epoch-reached/2`, {}); + break; + } + + numRetries += 1; + if (numRetries > 50) { + fail("Chain simulator not started!"); + } + } + } catch (e) { + console.error(e); + } + }); + + beforeEach(() => { + eventProcessor = new EventProcessor(); + jest.clearAllMocks(); + }); + + it('should deploy a contract and receive all the ping pong events', async () => { + const contractCodeRaw = fs.readFileSync('./e2e-tests/chain-simulator-e2e/contracts/ping-pong/ping-pong-egld.wasm'); + /* + fn init( + &self, + ping_amount: &BigUint, + ) { + */ + const contractArgs = [ + '0de0b6b3a7640000', // 1 egld + ]; + await fundAddress(CHAIN_SIMULATOR_URL, ALICE_ADDRESS); + const scAddress = await deploySc(new DeployScArgs({ + chainSimulatorUrl: CHAIN_SIMULATOR_URL, + deployer: ALICE_ADDRESS, + contractCodeRaw: contractCodeRaw, + hexArguments: contractArgs, + })); + logMessage(`Deployed ping pong SC. Address: ${scAddress}`); + + const numPingPongs = 20; + + for (let i = 0; i < numPingPongs; i++) { + const pingTxHash = await sendTransaction({ + chainSimulatorUrl: CHAIN_SIMULATOR_URL, + sender: ALICE_ADDRESS, + receiver: scAddress, + dataField: 'ping', + value: ONE_EGLD, + gasLimit: 20_000_000, + }); + logMessage(`Called 'ping' function of the contract. Tx hash: ${pingTxHash}`); + + const pongTxHash = await sendTransaction({ + chainSimulatorUrl: CHAIN_SIMULATOR_URL, + sender: ALICE_ADDRESS, + receiver: scAddress, + dataField: 'pong', + value: '0', + gasLimit: 20_000_000, + }); + logMessage(`Called 'pong' function of the contract. Tx hash: ${pongTxHash}`); + } + + await new Promise(resolve => setTimeout(resolve, 2000)); + + let numOfEventsReceived = 0; + let lastProcessedTimestamp = 0; + let counter = 0; + while (true) { + await eventProcessor.start(new EventProcessorOptions({ + emitterAddresses: [scAddress], + eventIdentifiers: ['ping', 'pong'], + getLastProcessedTimestamp: async () => lastProcessedTimestamp, + elasticUrl: ELASTIC_SEARCH_URL, + onEventsReceived: async (highestTimestamp, events) => { + logMessage(`event processor received ${events.length} events with the highest timestamp ${highestTimestamp}`); + numOfEventsReceived += events.length; + }, + setLastProcessedTimestamp: async (timestamp) => { + lastProcessedTimestamp = timestamp; + }, + pageSize: 7, + delayBetweenRequestsInMilliseconds: 100, + })); + + await new Promise(resolve => setTimeout(resolve, 100)); + logMessage(`Running event processor #${counter + 1}`); + + counter++; + if (counter > 10 || numOfEventsReceived === 2 * numPingPongs) { + break; + } + } + + expect(numOfEventsReceived).toBe(numPingPongs * 2); + }, 100000); + + it('should issue a token and event processor should receive esdt transfers', async () => { + await fundAddress(CHAIN_SIMULATOR_URL, BOB_ADDRESS); + const esdtIdentifier = await issueEsdt(new IssueEsdtArgs({ + chainSimulatorUrl: CHAIN_SIMULATOR_URL, + issuer: BOB_ADDRESS, + tokenName: 'BobToken', + tokenTicker: 'BOB', + })); + + const numTransfers = 20; + for (let i = 0; i < numTransfers; i++) { + const bobTransferTxHash = await transferEsdt(new TransferEsdtArgs({ + chainSimulatorUrl: CHAIN_SIMULATOR_URL, + sender: BOB_ADDRESS, + receiver: ALICE_ADDRESS, + tokenIdentifier: esdtIdentifier, + plainAmountOfTokens: 5, + })); + logMessage(`Transferred 5 tokens to Bob. Tx hash: ${bobTransferTxHash}}`); + const carolTransferTxHash = await transferEsdt(new TransferEsdtArgs({ + chainSimulatorUrl: CHAIN_SIMULATOR_URL, + sender: BOB_ADDRESS, + receiver: CAROL_ADDRESS, + tokenIdentifier: esdtIdentifier, + plainAmountOfTokens: 5, + })); + logMessage(`Transferred 5 tokens to Carol. Tx hash: ${carolTransferTxHash}}`); + } + + await new Promise(resolve => setTimeout(resolve, 2000)); + + let numOfEventsReceived = 0; + let lastProcessedTimestamp = 0; + let counter = 0; + while (true) { + await eventProcessor.start(new EventProcessorOptions({ + emitterAddresses: [BOB_ADDRESS], + eventIdentifiers: ['ESDTTransfer'], + shardId: 0, // Bob is in shard 0. esdt transfers events are emitted on both source and destination shards + getLastProcessedTimestamp: async () => lastProcessedTimestamp, + elasticUrl: ELASTIC_SEARCH_URL, + onEventsReceived: async (highestTimestamp, events) => { + logMessage(`event processor received ${events.length} events with the highest timestamp ${highestTimestamp}`); + for (const event of events) { + if (event && event.topics && event.topics[0] === Buffer.from(esdtIdentifier).toString('hex')) { + numOfEventsReceived++; + } + } + }, + setLastProcessedTimestamp: async (timestamp) => { + lastProcessedTimestamp = timestamp; + }, + pageSize: 7, + delayBetweenRequestsInMilliseconds: 100, + })); + + await new Promise(resolve => setTimeout(resolve, 100)); + logMessage(`Running event processor #${counter + 1}`); + + counter++; + if (counter > 10 || numOfEventsReceived === 2 * numTransfers) { + break; + } + } + + expect(numOfEventsReceived).toBe(numTransfers * 2); + }, 100000); +}); + +function logMessage(message: string) { + if (VERBOSE_LOGS) { + console.log(message); + } +} diff --git a/e2e-tests/chain-simulator-e2e/overridable-config.toml b/e2e-tests/chain-simulator-e2e/overridable-config.toml new file mode 100644 index 0000000..e9aa2d6 --- /dev/null +++ b/e2e-tests/chain-simulator-e2e/overridable-config.toml @@ -0,0 +1,8 @@ +OverridableConfigTomlValues = [ + { File = "external.toml", Path = "ElasticSearchConnector.Enabled", Value = "true" }, + { File = "external.toml", Path = "ElasticSearchConnector.URL", Value = "http://elasticsearch:9200" }, + { File = "enableEpochs.toml", Path = "EnableEpochs.StakeLimitsEnableEpoch", Value = "1000000" }, + { File = "systemSmartContractsConfig.toml", Path = "ESDTSystemSCConfig.BaseIssuingCost", Value = "50000000000000000" }, # (0.05 EGLD) + { File = "config.toml", Path = "Debug.Process.Enabled", Value = "false" }, + { File = "config.toml", Path = "WebServerAntiflood.WebServerAntifloodEnabled", Value = "false"} +] diff --git a/example/example.ts b/example/example.ts index 37a329a..6a8e9cc 100644 --- a/example/example.ts +++ b/example/example.ts @@ -13,6 +13,7 @@ async function run() { emitterAddresses: ['erd1qqqqqqqqqqqqqpgqt0uek344kaerr4gf9g2r8l0f4l8ygyha2jps82u9r6'], pageSize: 1000, scrollTimeout: "1m", + delayBetweenRequestsInMilliseconds: 100, getLastProcessedTimestamp: async () => { return lastProcessedTimestamp; }, diff --git a/package-lock.json b/package-lock.json index 4ce30e5..d313b64 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@multiversx/sdk-event-processor", - "version": "1.0.1", + "version": "1.0.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@multiversx/sdk-event-processor", - "version": "1.0.1", + "version": "1.0.2", "license": "GPL-3.0-or-later", "dependencies": { "axios": "^1.7.7" diff --git a/package.json b/package.json index 6a9b48d..ebc9724 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,11 @@ { "name": "@multiversx/sdk-event-processor", - "version": "1.0.1", + "version": "1.0.2", "description": "MultiversX logs&events processor", "main": "index.js", "scripts": { - "test": "jest", + "test": "jest --testPathPattern='(? { + private async callElasticsearchEvents(lastProcessedTimestamp: number): Promise { try { const url = `${this.options.elasticUrl}/events/_search?scroll=${this.options.scrollTimeout}`; - const elasticQuery = this.generateElasticsearchQuery(lastProcessedTimestamp); + const elasticQuery = generateElasticsearchQuery(lastProcessedTimestamp, this.options); const result = await axios.post(url, elasticQuery); const elasticEvents = result?.data?.hits?.hits ?? []; @@ -58,6 +61,10 @@ export class EventProcessor { return; } while (true) { + if (this.options.delayBetweenRequestsInMilliseconds) { + await new Promise(resolve => setTimeout(resolve, this.options.delayBetweenRequestsInMilliseconds)); + } + const scrollResult = await axios.post(`${this.options.elasticUrl}/_search/scroll`, { scroll_id: scrollId, @@ -89,69 +96,4 @@ export class EventProcessor { await setLatestProcessedTimestampFunc(lastTimestamp); } } - - generateElasticsearchQuery(timestamp: number) { - return { - size: this.options.pageSize, - query: { - bool: { - must: [ - { - terms: { - identifier: this.options.eventIdentifiers, // Query by identifiers - }, - }, - { - terms: { - address: this.options.emitterAddresses, // Query by addresses - }, - }, - { - range: { - timestamp: { - gt: `${timestamp}`, - }, - }, - }, - ], - }, - }, - sort: [ - { - timestamp: { - order: 'asc', // Sorting by timestamp in ascending order - }, - }, - ], - }; - } -} - -export class EventProcessorOptions { - elasticUrl?: string; - emitterAddresses?: string[]; - eventIdentifiers?: string[]; - pageSize?: number = 10000; - scrollTimeout?: string = "1m"; - onEventsReceived?: (highestTimestamp: number, events: EventSource[]) => void | Promise; - getLastProcessedTimestamp?: () => Promise; - setLastProcessedTimestamp?: (timestamp: number) => Promise; - - constructor(options: Partial = {}) { - Object.assign(this, options); - } -} - -export class EventSource { - originalTxHash?: string; - logAddress?: string; - identifier?: string; - address?: string; - topics?: string[]; - shardID?: number; - additionalData?: string[]; - txOrder?: number; - txHash?: string; - order?: number; - timestamp?: number; } diff --git a/src/test/event.processor.spec.ts b/src/test/event.processor.spec.ts index 2164687..3982763 100644 --- a/src/test/event.processor.spec.ts +++ b/src/test/event.processor.spec.ts @@ -1,5 +1,6 @@ import { EventProcessor } from "../event.processor"; import axios from "axios"; +import { EventProcessorOptions } from "../types/event.processor.options"; // Mock axios.post jest.mock('axios'); @@ -69,7 +70,7 @@ describe('EventProcessor', () => { getLastProcessedTimestamp: async () => 37, elasticUrl: 'https://myelastic.com', onEventsReceived: async () => {}, - setLastProcessedTimestamp: async (timestamp: number) => {}, + setLastProcessedTimestamp: async () => {}, })).rejects.toThrow(/Cannot call Elasticsearch/); expect(axios.post).toHaveBeenCalledTimes(1); }); @@ -86,8 +87,8 @@ describe('EventProcessor', () => { emitterAddresses: ['erd1nz88q5pevl6up2qxsgpqgc0qmnm93lh888wwwa68kmz363kdwz9q8tnems'], getLastProcessedTimestamp: async () => 37, elasticUrl: 'https://myelastic.com', - onEventsReceived: async (highestTimestamp, events) => {receivedEvents = events;}, - setLastProcessedTimestamp: async (timestamp: number) => {}, + onEventsReceived: async (_highestTimestamp, events) => {receivedEvents = events;}, + setLastProcessedTimestamp: async () => {}, }); expect(receivedEvents?.length).toBe(0); @@ -104,8 +105,8 @@ describe('EventProcessor', () => { emitterAddresses: ['erd1nz88q5pevl6up2qxsgpqgc0qmnm93lh888wwwa68kmz363kdwz9q8tnems'], getLastProcessedTimestamp: async () => 37, elasticUrl: 'https://httpbin.com/404', - onEventsReceived: async (highestTimestamp, events) => {receivedEvents = events;}, - setLastProcessedTimestamp: async (timestamp: number) => {}, + onEventsReceived: async (_highestTimestamp, events) => {receivedEvents = events;}, + setLastProcessedTimestamp: async () => {}, }); expect(receivedEvents?.length).toBe(0); @@ -126,8 +127,8 @@ describe('EventProcessor', () => { emitterAddresses: ['erd1nz88q5pevl6up2qxsgpqgc0qmnm93lh888wwwa68kmz363kdwz9q8tnems'], getLastProcessedTimestamp: async () => 37, elasticUrl: 'https://myelastic.com', - onEventsReceived: async (highestTimestamp, events) => {receivedEvents = events;}, - setLastProcessedTimestamp: async (timestamp: number) => {}, + onEventsReceived: async (_highestTimestamp, events) => {receivedEvents = events;}, + setLastProcessedTimestamp: async () => {}, }); expect(receivedEvents?.length).toBe(1); @@ -155,7 +156,7 @@ describe('EventProcessor', () => { numTimesReceivedEvents++; receivedEvents.push(...events); }, - setLastProcessedTimestamp: async (timestamp: number) => {}, + setLastProcessedTimestamp: async () => {}, pageSize: 10, }); @@ -166,6 +167,34 @@ describe('EventProcessor', () => { expect(axios.post).toHaveBeenCalledTimes(3); expect(numTimesReceivedEvents).toBe(2); }); + + it('should work and sleep between consecutive requests', async () => { + let numTimesReceivedEvents = 0; + const mockResponse1 = createElasticSearchResponse(0, 1, 2, 3); + const mockResponse2 = createElasticSearchResponse(); + // Mock axios.post to return a resolved Promise with mockResponse + (axios.post as jest.Mock) + .mockResolvedValueOnce(mockResponse1) // 1st call + .mockResolvedValueOnce(mockResponse2); // 2nd call + + const start = Date.now(); + + await eventProcessor.start(new EventProcessorOptions({ + emitterAddresses: ['erd1nz88q5pevl6up2qxsgpqgc0qmnm93lh888wwwa68kmz363kdwz9q8tnems'], + getLastProcessedTimestamp: async () => 37, + elasticUrl: 'https://myelastic.com', + onEventsReceived: async () => { + numTimesReceivedEvents++; + }, + setLastProcessedTimestamp: async () => {}, + pageSize: 10, + delayBetweenRequestsInMilliseconds: 1001, + })); + + const duration = Date.now() - start; + expect(duration).toBeGreaterThan(1000); + expect(numTimesReceivedEvents).toBe(1); + }); }); function createElasticSearchResponse(...timestamps: number[]) { diff --git a/src/types/elastic.event.source.ts b/src/types/elastic.event.source.ts new file mode 100644 index 0000000..5dc3a13 --- /dev/null +++ b/src/types/elastic.event.source.ts @@ -0,0 +1,13 @@ +export class EventSource { + originalTxHash?: string; + logAddress?: string; + identifier?: string; + address?: string; + topics?: string[]; + shardID?: number; + additionalData?: string[]; + txOrder?: number; + txHash?: string; + order?: number; + timestamp?: number; +} diff --git a/src/types/event.processor.options.ts b/src/types/event.processor.options.ts new file mode 100644 index 0000000..2bbe3e7 --- /dev/null +++ b/src/types/event.processor.options.ts @@ -0,0 +1,82 @@ +import { EventSource } from "./elastic.event.source"; + +/** + * Options for configuring the Event Processor. + */ +export class EventProcessorOptions { + /** + * URL of the Elasticsearch instance to connect to. + * @type {string | undefined} + */ + elasticUrl?: string; + + /** + * List of emitter addresses to filter events by. + * @type {string[] | undefined} + */ + emitterAddresses?: string[]; + + /** + * List of event identifiers to filter events by. + * @type {string[] | undefined} + */ + eventIdentifiers?: string[]; + + /** + * Shard ID to filter events by. Useful for example for ESDT transfers when a log will be issued on both source and destination shard + * @type {number | undefined} + */ + shardId?: number; + + /** + * Number of events to process per page. Defaults to 10,000. + * @type {number} + * @default 10000 + */ + pageSize?: number = 10000; + + /** + * Scroll timeout duration for Elasticsearch queries. + * Specifies how long Elasticsearch should keep the search context alive between scroll requests. + * Defaults to "1m" (1 minute). + * @type {string} + * @default "1m" + */ + scrollTimeout?: string = "1m"; + + /** + * Delay between sending requests, in milliseconds. + * This prevents overwhelming the Elasticsearch server with requests and is useful when there is a possibility of being rate limited by public instances. + * Defaults to 100 milliseconds. + * @type {number} + * @default 100 + */ + delayBetweenRequestsInMilliseconds?: number = 100; + + /** + * Callback that is triggered when events are received. + * The function takes the highest timestamp and the received events as arguments. + * Can return either a `void` or a `Promise` if asynchronous processing is required. + * @type {(highestTimestamp: number, events: EventSource[]) => void | Promise} + */ + onEventsReceived?: (highestTimestamp: number, events: EventSource[]) => void | Promise; + + /** + * Callback to retrieve the last processed timestamp. + * Should return a `Promise` that resolves to the last processed timestamp, or `undefined` if there is none. + * @type {() => Promise} + */ + getLastProcessedTimestamp?: () => Promise; + + /** + * Callback to set the last processed timestamp. + * Takes a timestamp as an argument and returns a `Promise`. + * @type {(timestamp: number) => Promise} + */ + setLastProcessedTimestamp?: (timestamp: number) => Promise; + + + constructor(options: Partial = {}) { + Object.assign(this, options); + } +} diff --git a/src/utils/elastic.helpers.ts b/src/utils/elastic.helpers.ts new file mode 100644 index 0000000..9b43002 --- /dev/null +++ b/src/utils/elastic.helpers.ts @@ -0,0 +1,53 @@ +import { EventProcessorOptions } from "../types/event.processor.options"; + +export function generateElasticsearchQuery(timestamp: number, options: EventProcessorOptions) { + const mustClauses = []; + + if (options.eventIdentifiers && options.eventIdentifiers.length > 0) { + mustClauses.push({ + terms: { + identifier: options.eventIdentifiers, + }, + }); + } + + if (options.emitterAddresses && options.emitterAddresses.length > 0) { + mustClauses.push({ + terms: { + address: options.emitterAddresses, + }, + }); + } + + if (options.shardId !== undefined) { + mustClauses.push({ + term: { + shardID: options.shardId, + }, + }); + } + + mustClauses.push({ + range: { + timestamp: { + gt: `${timestamp}`, + }, + }, + }); + + return { + size: options.pageSize, + query: { + bool: { + must: mustClauses, + }, + }, + sort: [ + { + timestamp: { + order: 'asc', // Sorting by timestamp in ascending order + }, + }, + ], + }; +}