Skip to content

Commit

Permalink
add transformation logic to new queue
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahSaso committed Jun 16, 2024
1 parent 6c3910f commit be6a77e
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 146 deletions.
4 changes: 4 additions & 0 deletions src/db/models/WasmStateEventTransformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ export class WasmStateEventTransformation extends DependableEventModel {
static async transformParsedStateEvents(
events: ParsedWasmStateEvent[]
): Promise<WasmStateEventTransformation[]> {
if (events.length === 0) {
return []
}

const chainId = (await State.getSingleton())?.chainId || 'unknown'
const transformers = getProcessedTransformers(loadConfig())
if (transformers.length === 0) {
Expand Down
8 changes: 7 additions & 1 deletion src/queues/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import { BaseQueue, QueueOptions } from './base'
import { ExportQueue } from './export'
import { SearchQueue } from './search'
import { TransformationsQueue } from './transform'
import { WebhooksQueue } from './webhooks'

// Hack to fix generic constructor on abstract class.
type IQueue = {
new (options: QueueOptions): BaseQueue<any>
} & typeof BaseQueue<any>

export const queues: IQueue[] = [ExportQueue, SearchQueue, WebhooksQueue]
export const queues: IQueue[] = [
ExportQueue,
SearchQueue,
WebhooksQueue,
TransformationsQueue,
]

export * from './base'
export * from './connection'
182 changes: 182 additions & 0 deletions src/queues/transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { Job, Queue } from 'bullmq'
import { Op } from 'sequelize'

import { Contract, WasmStateEvent, WasmStateEventTransformation } from '@/db'
import { WasmCodeService } from '@/services'

import { BaseQueue } from './base'
import { closeBullQueue, getBullQueue } from './connection'

export type TransformationsQueuePayload = {
/**
* Minimum block height. Defaults to 0.
*/
minBlockHeight?: number
/**
* Batch size. Defaults to 5,000.
*/
batchSize?: number
/**
* Transform set of addresses.
*/
addresses?: string[]
/**
* Transform contracts matching code IDs keys.
*/
codeIdsKeys?: string[]
/**
* Transform contracts matching code IDs.
*/
codeIds?: number[]
}

export class TransformationsQueue extends BaseQueue<TransformationsQueuePayload> {
static queueName = 'transformations'

static getQueue = () =>
getBullQueue<TransformationsQueuePayload>(this.queueName)
static add = async (
...params: Parameters<Queue<TransformationsQueuePayload>['add']>
) => (await this.getQueue()).add(...params)
static addBulk = async (
...params: Parameters<Queue<TransformationsQueuePayload>['addBulk']>
) => (await this.getQueue()).addBulk(...params)
static close = () => closeBullQueue(this.queueName)

async process({
data: {
minBlockHeight = 0,
batchSize = 5000,
addresses,
codeIdsKeys,
codeIds: _codeIds = [],
},
}: Job<TransformationsQueuePayload>): Promise<void> {
const foundCodeIds = WasmCodeService.getInstance().findWasmCodeIdsByKeys(
...(codeIdsKeys || [])
)
if (codeIdsKeys?.length && !foundCodeIds.length) {
throw new Error(
`no code IDs found for code IDs keys: ${codeIdsKeys.join(', ')}`
)
}

const codeIds = [..._codeIds, ...foundCodeIds]

const addressFilter = addresses?.length
? {
contractAddress: addresses,
}
: undefined

if (!addressFilter && !codeIds.length) {
throw new Error('no contract address nor code ID filter provided')
} else {
console.log(
`transforming events for contract addresses: ${addresses?.join(
', '
)} and code IDs: ${codeIds.join(', ')}`
)
}

const includeContract = {
include: {
model: Contract,
required: true,
where:
codeIds.length > 0
? {
codeId: {
[Op.in]: codeIds,
},
}
: undefined,
},
}

let latestBlockHeight = minBlockHeight
const total = await WasmStateEvent.count({
where: {
...addressFilter,
blockHeight: {
[Op.gte]: latestBlockHeight,
},
},
...includeContract,
})

console.log(`found ${total.toLocaleString()} events to transform...`)

let processed = 0
let transformed = 0

let latestBlockEventIdsSeen: number[] = []
while (processed < total) {
const events = await WasmStateEvent.findAll({
where: {
...addressFilter,
// Since there can be multiple events per block, the fixed batch size
// will likely end up leaving some events in the latest block out of
// this batch. To fix this, repeat the latest block again (>=)
// excluding the events we've already seen.
blockHeight: {
[Op.gte]: latestBlockHeight,
},
...(latestBlockEventIdsSeen.length > 0 && {
id: {
[Op.notIn]: latestBlockEventIdsSeen,
},
}),
},
limit: batchSize,
order: [['blockHeight', 'ASC']],
...includeContract,
})

// If there are no more events, we're done.
if (events.length === 0) {
break
}

const newLatestBlockHeight = events[events.length - 1].blockHeight

// If the latest block height is the same as the previous latest block
// height, we are still in the same block and should append the event IDs
// to the list instead of replacing it. This will only happen if the batch
// size is smaller than the maximum number of events in any one block.
// Otherwise, we're in a new block and should reset the list.
if (Number(newLatestBlockHeight) === latestBlockHeight) {
latestBlockEventIdsSeen = latestBlockEventIdsSeen.concat(
events.map((event) => event.id)
)
} else {
latestBlockEventIdsSeen = events
.filter((event) => event.blockHeight === newLatestBlockHeight)
.map((event) => event.id)
}

processed += events.length
latestBlockHeight = Number(newLatestBlockHeight)

const transformations =
await WasmStateEventTransformation.transformParsedStateEvents(
events.map((event) => event.asParsedEvent)
)

// const { updated, destroyed } = update
// ? await updateComputationValidityDependentOnChanges(transformations)
// : {
// updated: 0,
// destroyed: 0,
// }

transformed += transformations.length

console.log(
`transformed/processed/total: ${transformed.toLocaleString()}/${processed.toLocaleString()}/${total.toLocaleString()}. latest block height: ${latestBlockHeight.toLocaleString()}`
)
}

console.log('done!')
}
}
155 changes: 15 additions & 140 deletions src/scripts/transform.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import { Command } from 'commander'
import { Op } from 'sequelize'

import { loadConfig } from '@/core'
import {
Contract,
WasmStateEvent,
WasmStateEventTransformation,
loadDb,
updateComputationValidityDependentOnChanges,
} from '@/db'
import { loadDb } from '@/db'
import { TransformationsQueue } from '@/queues/transform'
import { WasmCodeService } from '@/services/wasm-codes'

const LOADER_MAP = ['—', '\\', '|', '/']

const main = async () => {
// Parse arguments.
const program = new Command()
Expand Down Expand Up @@ -59,145 +51,28 @@ const main = async () => {
console.log(`\n[${new Date().toISOString()}] Transforming existing events...`)

// Load config with config option.
loadConfig(_config)
const config = loadConfig(_config)

// Load DB on start.
const sequelize = await loadDb()

// Set up wasm code service.
await WasmCodeService.setUpInstance()

let processed = 0
let computationsUpdated = 0
let computationsDestroyed = 0
let transformed = 0

const addressFilter = addresses?.length
? {
contractAddress: addresses,
}
: {}

const extractedCodeIdsKeys = WasmCodeService.extractWasmCodeKeys(codeIdsKeys)
const codeIds = WasmCodeService.getInstance().findWasmCodeIdsByKeys(
...extractedCodeIdsKeys
)

if (extractedCodeIdsKeys.length > 0 && codeIds.length === 0) {
throw new Error(
'No code IDs found matching keys: ' + extractedCodeIdsKeys.join(', ')
)
}

const includeContract = {
include: {
model: Contract,
required: true,
where:
codeIds.length > 0
? {
codeId: {
[Op.in]: codeIds,
},
}
: undefined,
},
}

let latestBlockHeight = initial
const total = await WasmStateEvent.count({
where: {
...addressFilter,
blockHeight: {
[Op.gte]: latestBlockHeight,
},
// Use queue process function directly.
await new TransformationsQueue({
config,
updateComputations: !!update,
sendWebhooks: false,
}).process({
data: {
minBlockHeight: initial,
batchSize: batch,
addresses,
codeIdsKeys: WasmCodeService.extractWasmCodeKeys(codeIdsKeys),
},
...includeContract,
})

// Print latest statistics every 100ms.
let printLoaderCount = 0
const printStatistics = () => {
printLoaderCount = (printLoaderCount + 1) % LOADER_MAP.length
process.stdout.write(
`\r${
LOADER_MAP[printLoaderCount]
} Transformed: ${transformed.toLocaleString()}. Event processed/total: ${processed.toLocaleString()}/${total.toLocaleString()}. Computations updated/destroyed: ${computationsUpdated.toLocaleString()}/${computationsDestroyed.toLocaleString()}. Latest block height: ${latestBlockHeight.toLocaleString()}`
)
}
const logInterval = setInterval(printStatistics, 100)
// Allow process to exit even though this interval is alive.
logInterval.unref()

let latestBlockEventIdsSeen: number[] = []
while (processed < total) {
const events = await WasmStateEvent.findAll({
where: {
...addressFilter,
// Since there can be multiple events per block, the fixed batch size
// will likely end up leaving some events in the latest block out of
// this batch. To fix this, repeat the latest block again (>=) excluding
// the events we've already seen.
blockHeight: {
[Op.gte]: latestBlockHeight,
},
...(latestBlockEventIdsSeen.length > 0 && {
id: {
[Op.notIn]: latestBlockEventIdsSeen,
},
}),
},
limit: batch,
order: [['blockHeight', 'ASC']],
...includeContract,
})

// If there are no more events, we're done.
if (events.length === 0) {
break
}

const newLatestBlockHeight = events[events.length - 1].blockHeight

// If the latest block height is the same as the previous latest block
// height, we are still in the same block and should append the event IDs to
// the list instead of replacing it. This will only happen if the batch size
// is smaller than the maximum number of events in any one block. Otherwise,
// we're in a new block and should reset the list.
if (Number(newLatestBlockHeight) === latestBlockHeight) {
latestBlockEventIdsSeen = latestBlockEventIdsSeen.concat(
events.map((event) => event.id)
)
} else {
latestBlockEventIdsSeen = events
.filter((event) => event.blockHeight === newLatestBlockHeight)
.map((event) => event.id)
}

processed += events.length
latestBlockHeight = Number(newLatestBlockHeight)

const transformations =
await WasmStateEventTransformation.transformParsedStateEvents(
events.map((event) => event.asParsedEvent)
)

transformed += transformations.length

const { updated, destroyed } = update
? await updateComputationValidityDependentOnChanges(transformations)
: {
updated: 0,
destroyed: 0,
}

computationsUpdated += updated
computationsDestroyed += destroyed
}

clearInterval(logInterval)
} as any)

printStatistics()
console.log(`\n[${new Date().toISOString()}] Transforming complete.`)

await sequelize.close()
Expand Down
Loading

0 comments on commit be6a77e

Please sign in to comment.