Skip to content

Commit

Permalink
Parallel batches requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Albermonte committed Jun 11, 2024
1 parent b0ea194 commit ea6a4b5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 18 deletions.
2 changes: 1 addition & 1 deletion nuxt.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export default defineNuxtConfig({
},

scheduledTasks: {
'20 */13 * * *': ['fetch']
'0 */12 * * *': ['fetch']
}
},

Expand Down
80 changes: 68 additions & 12 deletions packages/nimiq-vts/src/fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Client, type ElectionMacroBlock, InherentType } from "nimiq-rpc-client-ts"
import type { Activity, ValidatorActivity, ValidatorsActivities } from "./types"
import type { ValidatorActivity } from "./types"
import { getPolicyConstants } from "./utils"

/**
Expand Down Expand Up @@ -28,21 +28,77 @@ export async function fetchValidatorsActivitiesInEpoch(client: Client, blockNumb
validatorsActivity[validator] = { likelihood, missed: 0, rewarded: 0 }
}

// Then, fetch each batch in the epoch and update the activity
for (let i = 0; i < batchesPerEpoch; i++) {
const { data: inherents, error: errorBatch } = await client.blockchain.getInherentsByBatchNumber(firstBatchIndex + i)
if (errorBatch || !inherents) throw new Error(JSON.stringify({ blockNumber, errorBatch, i, firstBatchIndex, currentIndex: firstBatchIndex + i }))
const maxBatchSize = 120;
const minBatchSize = 10;
let batchSize = maxBatchSize;
for (let i = 0; i < batchesPerEpoch; i += batchSize) {
let batchPromises = Array.from({ length: Math.min(batchSize, batchesPerEpoch - i) }, (_, j) => createPromise(i + j));

let results = await Promise.allSettled(batchPromises);

let rejectedIndexes: number[] = results.reduce((acc: number[], result, index) => {
if (result.status === 'rejected') {
acc.push(index);
}
return acc;
}, []);

if (rejectedIndexes.length > 0) {
// Lowering the batch size to prevent more rejections
batchSize = Math.max(minBatchSize, Math.floor(batchSize / 2))
console.log(`Decreasing batch size to ${batchSize}`)
} else {
batchSize = Math.min(maxBatchSize, Math.floor(batchSize + batchSize / 2))
if(batchSize !== maxBatchSize)
console.log(`Increasing batch size to ${batchSize}`)
}

while (rejectedIndexes.length > 0) {
let retryPromises = rejectedIndexes.map(index => createPromise(i + index));
console.log(`Retrying ${rejectedIndexes.length} batches for block ${blockNumber}`);
results = await Promise.allSettled(retryPromises);

for (const { type, validatorAddress } of inherents) {
if (validatorAddress === 'NQ07 0000 0000 0000 0000 0000 0000 0000 0000') continue
// TODO Add comment why this case can happen. e.g address: NQ57 M1NT JRQA FGD2 in election block 3075210
if(!validatorsActivity[validatorAddress]) continue
validatorsActivity[validatorAddress].rewarded += type === InherentType.Reward ? 1 : 0
validatorsActivity[validatorAddress].missed += [InherentType.Penalize, InherentType.Jail].includes(type) ? 1 : 0
// TODO Maybe there are more states we need to consider
rejectedIndexes = results.reduce((acc: number[], result, index) => {
if (result.status === 'rejected') {
acc.push(rejectedIndexes[index]);
}
return acc;
}, []);
}
}

function createPromise(index: number) {
return new Promise<void>(async (resolve, reject) => {
const { data: inherents, error: errorBatch } = await client.blockchain.getInherentsByBatchNumber(firstBatchIndex + index)
if (errorBatch || !inherents) {
reject(JSON.stringify({ blockNumber, errorBatch, index, firstBatchIndex, currentIndex: firstBatchIndex + index }));
} else {
for (const { type, validatorAddress } of inherents) {
if (validatorAddress === 'NQ07 0000 0000 0000 0000 0000 0000 0000 0000') continue
if (!validatorsActivity[validatorAddress]) continue
validatorsActivity[validatorAddress].rewarded += type === InherentType.Reward ? 1 : 0
validatorsActivity[validatorAddress].missed += [InherentType.Penalize, InherentType.Jail].includes(type) ? 1 : 0
}
resolve();
}
});
}

// Then, fetch each batch in the epoch and update the activity
// for (let i = 0; i < batchesPerEpoch; i++) {
// const { data: inherents, error: errorBatch } = await client.blockchain.getInherentsByBatchNumber(firstBatchIndex + i)
// if (errorBatch || !inherents) throw new Error(JSON.stringify({ blockNumber, errorBatch, i, firstBatchIndex, currentIndex: firstBatchIndex + i }))

// for (const { type, validatorAddress } of inherents) {
// if (validatorAddress === 'NQ07 0000 0000 0000 0000 0000 0000 0000 0000') continue
// // TODO Add comment why this case can happen. e.g address: NQ57 M1NT JRQA FGD2 in election block 3075210
// if(!validatorsActivity[validatorAddress]) continue
// validatorsActivity[validatorAddress].rewarded += type === InherentType.Reward ? 1 : 0
// validatorsActivity[validatorAddress].missed += [InherentType.Penalize, InherentType.Jail].includes(type) ? 1 : 0
// // TODO Maybe there are more states we need to consider
// }
// }

const end = globalThis.performance.now()
const seconds = Math.floor((end - start) / 1000)
console.log(`Fetched slots assignation for block ${blockNumber} in ${seconds} seconds.`)
Expand Down
4 changes: 2 additions & 2 deletions server/database/migrations/meta/0000_snapshot.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"version": "6",
"dialect": "sqlite",
"id": "b74dc22e-f8af-4974-95ea-9fc52b9b0c79",
"id": "a8b4c590-767a-454e-9a04-930d60dcf51b",
"prevId": "00000000-0000-0000-0000-000000000000",
"tables": {
"activity": {
Expand Down Expand Up @@ -234,4 +234,4 @@
"tables": {},
"columns": {}
}
}
}
6 changes: 3 additions & 3 deletions server/database/migrations/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
{
"idx": 0,
"version": "6",
"when": 1717098265951,
"tag": "0000_pale_moonstone",
"when": 1718092093045,
"tag": "0000_orange_gargoyle",
"breakpoints": true
}
]
}
}

0 comments on commit ea6a4b5

Please sign in to comment.