diff --git a/src/commands/command-executor.js b/src/commands/command-executor.js index 17def90862..b7f14e436c 100644 --- a/src/commands/command-executor.js +++ b/src/commands/command-executor.js @@ -7,6 +7,7 @@ import { COMMAND_STATUS, DEFAULT_COMMAND_DELAY_IN_MILLS, COMMAND_QUEUE_PARALLELISM, + DEFAULT_COMMAND_PRIORITY, } from '../constants/constants.js'; /** @@ -20,7 +21,7 @@ class CommandExecutor { this.repositoryModuleManager = ctx.repositoryModuleManager; this.verboseLoggingEnabled = ctx.config.commandExecutorVerboseLoggingEnabled; - this.queue = async.queue((command, callback = () => {}) => { + this.queue = async.priorityQueue((command, callback = () => {}) => { this._execute(command) .then((result) => { callback(result); @@ -81,7 +82,7 @@ class CommandExecutor { commandId: command.id, commandName: command.name, }; - if (command.data && command.data.operationId) { + if (command.data?.operationId !== undefined) { commandContext.operationId = command.data.operationId; } const loggerWithContext = this.logger.child(commandContext); @@ -106,7 +107,7 @@ class CommandExecutor { }); return; } - if (command.deadlineAt && now > command.deadlineAt) { + if (command.deadlineAt !== undefined && now > command.deadlineAt) { loggerWithContext.warn('Command is too late...'); await this._update(command, { status: COMMAND_STATUS.EXPIRED, @@ -280,7 +281,7 @@ class CommandExecutor { let delay = addDelay ?? 0; if (delay > MAX_COMMAND_DELAY_IN_MILLS) { - if (command.readyAt == null) { + if (command.readyAt === undefined) { command.readyAt = Date.now(); } command.readyAt += delay; @@ -290,16 +291,19 @@ class CommandExecutor { if (insert) { command = await this._insert(command); } + + const commandPriority = command.priority ?? DEFAULT_COMMAND_PRIORITY; + if (delay) { setTimeout( (timeoutCommand) => { - this.queue.push(timeoutCommand); + this.queue.push(timeoutCommand, commandPriority); }, delay, command, ); } else { - this.queue.push(command); + this.queue.push(command, commandPriority); } } @@ -311,7 +315,7 @@ class CommandExecutor { */ async _handleRetry(retryCommand, handler) { const command = retryCommand; - if (command.retries > 1) { + if (command.retries !== undefined && command.retries > 1) { command.data = handler.pack(command.data); await this._update(command, { status: COMMAND_STATUS.PENDING, @@ -337,7 +341,7 @@ class CommandExecutor { * @private */ async _handleError(command, handler, error) { - if (command.retries > 0) { + if (command.retries !== undefined && command.retries > 0) { await this._update(command, { retries: command.retries - 1, }); @@ -368,33 +372,31 @@ class CommandExecutor { * @private */ async _insert(insertCommand, transaction = null) { - const command = insertCommand; - if (!command.name) { - [command.name] = command.sequence; - command.sequence = command.sequence.slice(1); - } - if (!command.readyAt) { - command.readyAt = Date.now(); // take current time - } - if (command.delay == null) { - command.delay = 0; - } - if (!command.transactional) { - command.transactional = 0; - } - if (!command.data) { + const { sequence, name, readyAt, delay, transactional, data, priority } = insertCommand; + + const command = { + ...insertCommand, + name: name || sequence?.[0], + sequence: name ? sequence : sequence?.slice(1), + readyAt: readyAt ?? Date.now(), + delay: delay ?? 0, + transactional: transactional ?? 0, + priority: priority ?? DEFAULT_COMMAND_PRIORITY, + status: COMMAND_STATUS.PENDING, + }; + + if (!data) { const commandInstance = this.commandResolver.resolve(command.name); if (commandInstance) { command.data = commandInstance.pack(command.data); } } - command.status = COMMAND_STATUS.PENDING; - const opts = {}; - if (transaction != null) { - opts.transaction = transaction; - } + + const opts = transaction ? { transaction } : {}; const model = await this.repositoryModuleManager.createCommand(command, opts); + command.id = model.id; + return command; } @@ -462,6 +464,7 @@ class CommandExecutor { id: commandModel.id, name: commandModel.name, data: commandModel.data, + priority: commandModel.priority ?? DEFAULT_COMMAND_PRIORITY, readyAt: commandModel.readyAt, delay: commandModel.delay, startedAt: commandModel.startedAt, diff --git a/src/constants/constants.js b/src/constants/constants.js index 086f0daa9b..2dcc618b66 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -68,6 +68,16 @@ export const BLS_KEY_FILENAME = 'secretKey'; export const TRIPLE_STORE_CONNECT_MAX_RETRIES = 10; +export const COMMAND_PRIORITY = { + HIGHEST: 0, + HIGH: 1, + MEDIUM: 5, + LOW: 10, + LOWEST: 20, +}; + +export const DEFAULT_COMMAND_PRIORITY = COMMAND_PRIORITY.MEDIUM; + export const DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS = 15 * 24 * 60 * 60 * 1000; // 15 days export const MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS = 60 * 60 * 1000; // 1 hour @@ -211,18 +221,21 @@ export const DEFAULT_COMMAND_REPEAT_INTERVAL_IN_MILLS = 5000; // 5 seconds export const DEFAULT_COMMAND_DELAY_IN_MILLS = 60 * 1000; // 60 seconds export const TRANSACTION_PRIORITY = { + HIGHEST: 0, HIGH: 1, - REGULAR: 2, + MEDIUM: 5, + LOW: 10, + LOWEST: 20, }; export const CONTRACT_FUNCTION_PRIORITY = { 'submitCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))': - TRANSACTION_PRIORITY.REGULAR, - 'submitCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.REGULAR, + TRANSACTION_PRIORITY.MEDIUM, + 'submitCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.MEDIUM, 'submitUpdateCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))': TRANSACTION_PRIORITY.HIGH, 'submitUpdateCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.HIGH, - sendProof: TRANSACTION_PRIORITY.REGULAR, + sendProof: TRANSACTION_PRIORITY.MEDIUM, }; export const COMMAND_RETRIES = { diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index d86bfab1ac..0da44b57f3 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -81,7 +81,7 @@ class Web3Service { initializeTransactionQueues(concurrency = TRANSACTION_QUEUE_CONCURRENCY) { this.transactionQueues = {}; for (const operationalWallet of this.operationalWallets) { - const transactionQueue = async.queue((args, cb) => { + const transactionQueue = async.priorityQueue((args, cb) => { const { contractInstance, functionName, transactionArgs, gasPrice } = args; this._executeContractFunction( contractInstance, @@ -104,33 +104,18 @@ class Web3Service { queueTransaction(contractInstance, functionName, transactionArgs, callback, gasPrice) { const selectedQueue = this.selectTransactionQueue(); - const priority = CONTRACT_FUNCTION_PRIORITY[functionName] ?? TRANSACTION_PRIORITY.REGULAR; + const priority = CONTRACT_FUNCTION_PRIORITY[functionName] ?? TRANSACTION_PRIORITY.MEDIUM; this.logger.info(`Calling ${functionName} with priority: ${priority}`); - switch (priority) { - case TRANSACTION_PRIORITY.HIGH: - selectedQueue.unshift( - { - contractInstance, - functionName, - transactionArgs, - gasPrice, - }, - callback, - ); - break; - case TRANSACTION_PRIORITY.REGULAR: - default: - selectedQueue.push( - { - contractInstance, - functionName, - transactionArgs, - gasPrice, - }, - callback, - ); - break; - } + selectedQueue.push( + { + contractInstance, + functionName, + transactionArgs, + gasPrice, + }, + priority, + callback, + ); } removeTransactionQueue(walletAddress) { diff --git a/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js b/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js new file mode 100644 index 0000000000..09e969fb8c --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js @@ -0,0 +1,9 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.addColumn('commands', 'priority', { + type: Sequelize.BIGINT, + }); +} + +export async function down({ context: { queryInterface } }) { + await queryInterface.removeColumn('commands', 'priority'); +} diff --git a/src/modules/repository/implementation/sequelize/models/commands.js b/src/modules/repository/implementation/sequelize/models/commands.js index dde59ba7f0..e8c48316b4 100644 --- a/src/modules/repository/implementation/sequelize/models/commands.js +++ b/src/modules/repository/implementation/sequelize/models/commands.js @@ -17,12 +17,13 @@ export default (sequelize, DataTypes) => { }, name: DataTypes.STRING, data: DataTypes.JSON, + priority: DataTypes.BIGINT, sequence: DataTypes.JSON, readyAt: DataTypes.BIGINT, delay: DataTypes.BIGINT, startedAt: DataTypes.BIGINT, deadlineAt: DataTypes.BIGINT, - period: DataTypes.INTEGER, + period: DataTypes.BIGINT, status: DataTypes.STRING, message: DataTypes.TEXT, parentId: DataTypes.UUID,