diff --git a/src/db/models/WasmStateEventTransformation.ts b/src/db/models/WasmStateEventTransformation.ts index 5d5b6529..7a26d386 100644 --- a/src/db/models/WasmStateEventTransformation.ts +++ b/src/db/models/WasmStateEventTransformation.ts @@ -193,6 +193,10 @@ export class WasmStateEventTransformation extends DependableEventModel { static async transformParsedStateEvents( events: ParsedWasmStateEvent[] ): Promise { + if (events.length === 0) { + return [] + } + const chainId = (await State.getSingleton())?.chainId || 'unknown' const transformers = getProcessedTransformers(loadConfig()) if (transformers.length === 0) { diff --git a/src/queues/index.ts b/src/queues/index.ts index fed4d087..7da4731c 100644 --- a/src/queues/index.ts +++ b/src/queues/index.ts @@ -1,6 +1,7 @@ import { BaseQueue, QueueOptions } from './base' import { ExportQueue } from './export' import { SearchQueue } from './search' +import { TransformationsQueue } from './transform' import { WebhooksQueue } from './webhooks' // Hack to fix generic constructor on abstract class. @@ -8,7 +9,12 @@ type IQueue = { new (options: QueueOptions): BaseQueue } & typeof BaseQueue -export const queues: IQueue[] = [ExportQueue, SearchQueue, WebhooksQueue] +export const queues: IQueue[] = [ + ExportQueue, + SearchQueue, + WebhooksQueue, + TransformationsQueue, +] export * from './base' export * from './connection' diff --git a/src/queues/transform.ts b/src/queues/transform.ts new file mode 100644 index 00000000..1cc8b7b4 --- /dev/null +++ b/src/queues/transform.ts @@ -0,0 +1,182 @@ +import { Job, Queue } from 'bullmq' +import { Op } from 'sequelize' + +import { Contract, WasmStateEvent, WasmStateEventTransformation } from '@/db' +import { WasmCodeService } from '@/services' + +import { BaseQueue } from './base' +import { closeBullQueue, getBullQueue } from './connection' + +export type TransformationsQueuePayload = { + /** + * Minimum block height. Defaults to 0. + */ + minBlockHeight?: number + /** + * Batch size. Defaults to 5,000. + */ + batchSize?: number + /** + * Transform set of addresses. + */ + addresses?: string[] + /** + * Transform contracts matching code IDs keys. + */ + codeIdsKeys?: string[] + /** + * Transform contracts matching code IDs. + */ + codeIds?: number[] +} + +export class TransformationsQueue extends BaseQueue { + static queueName = 'transformations' + + static getQueue = () => + getBullQueue(this.queueName) + static add = async ( + ...params: Parameters['add']> + ) => (await this.getQueue()).add(...params) + static addBulk = async ( + ...params: Parameters['addBulk']> + ) => (await this.getQueue()).addBulk(...params) + static close = () => closeBullQueue(this.queueName) + + async process({ + data: { + minBlockHeight = 0, + batchSize = 5000, + addresses, + codeIdsKeys, + codeIds: _codeIds = [], + }, + }: Job): Promise { + const foundCodeIds = WasmCodeService.getInstance().findWasmCodeIdsByKeys( + ...(codeIdsKeys || []) + ) + if (codeIdsKeys?.length && !foundCodeIds.length) { + throw new Error( + `no code IDs found for code IDs keys: ${codeIdsKeys.join(', ')}` + ) + } + + const codeIds = [..._codeIds, ...foundCodeIds] + + const addressFilter = addresses?.length + ? { + contractAddress: addresses, + } + : undefined + + if (!addressFilter && !codeIds.length) { + throw new Error('no contract address nor code ID filter provided') + } else { + console.log( + `transforming events for contract addresses: ${addresses?.join( + ', ' + )} and code IDs: ${codeIds.join(', ')}` + ) + } + + const includeContract = { + include: { + model: Contract, + required: true, + where: + codeIds.length > 0 + ? { + codeId: { + [Op.in]: codeIds, + }, + } + : undefined, + }, + } + + let latestBlockHeight = minBlockHeight + const total = await WasmStateEvent.count({ + where: { + ...addressFilter, + blockHeight: { + [Op.gte]: latestBlockHeight, + }, + }, + ...includeContract, + }) + + console.log(`found ${total.toLocaleString()} events to transform...`) + + let processed = 0 + let transformed = 0 + + let latestBlockEventIdsSeen: number[] = [] + while (processed < total) { + const events = await WasmStateEvent.findAll({ + where: { + ...addressFilter, + // Since there can be multiple events per block, the fixed batch size + // will likely end up leaving some events in the latest block out of + // this batch. To fix this, repeat the latest block again (>=) + // excluding the events we've already seen. + blockHeight: { + [Op.gte]: latestBlockHeight, + }, + ...(latestBlockEventIdsSeen.length > 0 && { + id: { + [Op.notIn]: latestBlockEventIdsSeen, + }, + }), + }, + limit: batchSize, + order: [['blockHeight', 'ASC']], + ...includeContract, + }) + + // If there are no more events, we're done. + if (events.length === 0) { + break + } + + const newLatestBlockHeight = events[events.length - 1].blockHeight + + // If the latest block height is the same as the previous latest block + // height, we are still in the same block and should append the event IDs + // to the list instead of replacing it. This will only happen if the batch + // size is smaller than the maximum number of events in any one block. + // Otherwise, we're in a new block and should reset the list. + if (Number(newLatestBlockHeight) === latestBlockHeight) { + latestBlockEventIdsSeen = latestBlockEventIdsSeen.concat( + events.map((event) => event.id) + ) + } else { + latestBlockEventIdsSeen = events + .filter((event) => event.blockHeight === newLatestBlockHeight) + .map((event) => event.id) + } + + processed += events.length + latestBlockHeight = Number(newLatestBlockHeight) + + const transformations = + await WasmStateEventTransformation.transformParsedStateEvents( + events.map((event) => event.asParsedEvent) + ) + + // const { updated, destroyed } = update + // ? await updateComputationValidityDependentOnChanges(transformations) + // : { + // updated: 0, + // destroyed: 0, + // } + + transformed += transformations.length + + console.log( + `transformed/processed/total: ${transformed.toLocaleString()}/${processed.toLocaleString()}/${total.toLocaleString()}. latest block height: ${latestBlockHeight.toLocaleString()}` + ) + } + + console.log('done!') + } +} diff --git a/src/scripts/transform.ts b/src/scripts/transform.ts index cff0117a..040598c9 100644 --- a/src/scripts/transform.ts +++ b/src/scripts/transform.ts @@ -1,18 +1,10 @@ import { Command } from 'commander' -import { Op } from 'sequelize' import { loadConfig } from '@/core' -import { - Contract, - WasmStateEvent, - WasmStateEventTransformation, - loadDb, - updateComputationValidityDependentOnChanges, -} from '@/db' +import { loadDb } from '@/db' +import { TransformationsQueue } from '@/queues/transform' import { WasmCodeService } from '@/services/wasm-codes' -const LOADER_MAP = ['—', '\\', '|', '/'] - const main = async () => { // Parse arguments. const program = new Command() @@ -59,7 +51,7 @@ const main = async () => { console.log(`\n[${new Date().toISOString()}] Transforming existing events...`) // Load config with config option. - loadConfig(_config) + const config = loadConfig(_config) // Load DB on start. const sequelize = await loadDb() @@ -67,137 +59,20 @@ const main = async () => { // Set up wasm code service. await WasmCodeService.setUpInstance() - let processed = 0 - let computationsUpdated = 0 - let computationsDestroyed = 0 - let transformed = 0 - - const addressFilter = addresses?.length - ? { - contractAddress: addresses, - } - : {} - - const extractedCodeIdsKeys = WasmCodeService.extractWasmCodeKeys(codeIdsKeys) - const codeIds = WasmCodeService.getInstance().findWasmCodeIdsByKeys( - ...extractedCodeIdsKeys - ) - - if (extractedCodeIdsKeys.length > 0 && codeIds.length === 0) { - throw new Error( - 'No code IDs found matching keys: ' + extractedCodeIdsKeys.join(', ') - ) - } - - const includeContract = { - include: { - model: Contract, - required: true, - where: - codeIds.length > 0 - ? { - codeId: { - [Op.in]: codeIds, - }, - } - : undefined, - }, - } - - let latestBlockHeight = initial - const total = await WasmStateEvent.count({ - where: { - ...addressFilter, - blockHeight: { - [Op.gte]: latestBlockHeight, - }, + // Use queue process function directly. + await new TransformationsQueue({ + config, + updateComputations: !!update, + sendWebhooks: false, + }).process({ + data: { + minBlockHeight: initial, + batchSize: batch, + addresses, + codeIdsKeys: WasmCodeService.extractWasmCodeKeys(codeIdsKeys), }, - ...includeContract, - }) - - // Print latest statistics every 100ms. - let printLoaderCount = 0 - const printStatistics = () => { - printLoaderCount = (printLoaderCount + 1) % LOADER_MAP.length - process.stdout.write( - `\r${ - LOADER_MAP[printLoaderCount] - } Transformed: ${transformed.toLocaleString()}. Event processed/total: ${processed.toLocaleString()}/${total.toLocaleString()}. Computations updated/destroyed: ${computationsUpdated.toLocaleString()}/${computationsDestroyed.toLocaleString()}. Latest block height: ${latestBlockHeight.toLocaleString()}` - ) - } - const logInterval = setInterval(printStatistics, 100) - // Allow process to exit even though this interval is alive. - logInterval.unref() - - let latestBlockEventIdsSeen: number[] = [] - while (processed < total) { - const events = await WasmStateEvent.findAll({ - where: { - ...addressFilter, - // Since there can be multiple events per block, the fixed batch size - // will likely end up leaving some events in the latest block out of - // this batch. To fix this, repeat the latest block again (>=) excluding - // the events we've already seen. - blockHeight: { - [Op.gte]: latestBlockHeight, - }, - ...(latestBlockEventIdsSeen.length > 0 && { - id: { - [Op.notIn]: latestBlockEventIdsSeen, - }, - }), - }, - limit: batch, - order: [['blockHeight', 'ASC']], - ...includeContract, - }) - - // If there are no more events, we're done. - if (events.length === 0) { - break - } - - const newLatestBlockHeight = events[events.length - 1].blockHeight - - // If the latest block height is the same as the previous latest block - // height, we are still in the same block and should append the event IDs to - // the list instead of replacing it. This will only happen if the batch size - // is smaller than the maximum number of events in any one block. Otherwise, - // we're in a new block and should reset the list. - if (Number(newLatestBlockHeight) === latestBlockHeight) { - latestBlockEventIdsSeen = latestBlockEventIdsSeen.concat( - events.map((event) => event.id) - ) - } else { - latestBlockEventIdsSeen = events - .filter((event) => event.blockHeight === newLatestBlockHeight) - .map((event) => event.id) - } - - processed += events.length - latestBlockHeight = Number(newLatestBlockHeight) - - const transformations = - await WasmStateEventTransformation.transformParsedStateEvents( - events.map((event) => event.asParsedEvent) - ) - - transformed += transformations.length - - const { updated, destroyed } = update - ? await updateComputationValidityDependentOnChanges(transformations) - : { - updated: 0, - destroyed: 0, - } - - computationsUpdated += updated - computationsDestroyed += destroyed - } - - clearInterval(logInterval) + } as any) - printStatistics() console.log(`\n[${new Date().toISOString()}] Transforming complete.`) await sequelize.close() diff --git a/src/services/wasm-codes/wasm-code.service.ts b/src/services/wasm-codes/wasm-code.service.ts index 75efe49f..77242bd5 100644 --- a/src/services/wasm-codes/wasm-code.service.ts +++ b/src/services/wasm-codes/wasm-code.service.ts @@ -163,11 +163,14 @@ export class WasmCodeService implements WasmCodeAdapter { * Find all code IDs for the list of keys. */ findWasmCodeIdsByKeys(...keys: string[]): number[] { - return keys.flatMap( - (key: string) => - this.wasmCodes.find((wasmCode: WasmCode) => wasmCode.codeKey === key) - ?.codeIds ?? [] - ) + return keys.length === 0 + ? [] + : keys.flatMap( + (key: string) => + this.wasmCodes.find( + (wasmCode: WasmCode) => wasmCode.codeKey === key + )?.codeIds ?? [] + ) } /**