Skip to content

Commit

Permalink
Merge pull request #3428 from OriginTrail/feature/priority-queues
Browse files Browse the repository at this point in the history
[V8] [FEATURE] Command Executor Priority Queue
  • Loading branch information
u-hubar authored Nov 28, 2024
2 parents fb02896 + f13602d commit 3804787
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 60 deletions.
59 changes: 31 additions & 28 deletions src/commands/command-executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
COMMAND_STATUS,
DEFAULT_COMMAND_DELAY_IN_MILLS,
COMMAND_QUEUE_PARALLELISM,
DEFAULT_COMMAND_PRIORITY,
} from '../constants/constants.js';

/**
Expand All @@ -20,7 +21,7 @@ class CommandExecutor {
this.repositoryModuleManager = ctx.repositoryModuleManager;
this.verboseLoggingEnabled = ctx.config.commandExecutorVerboseLoggingEnabled;

this.queue = async.queue((command, callback = () => {}) => {
this.queue = async.priorityQueue((command, callback = () => {}) => {
this._execute(command)
.then((result) => {
callback(result);
Expand Down Expand Up @@ -81,7 +82,7 @@ class CommandExecutor {
commandId: command.id,
commandName: command.name,
};
if (command.data && command.data.operationId) {
if (command.data?.operationId !== undefined) {
commandContext.operationId = command.data.operationId;
}
const loggerWithContext = this.logger.child(commandContext);
Expand All @@ -106,7 +107,7 @@ class CommandExecutor {
});
return;
}
if (command.deadlineAt && now > command.deadlineAt) {
if (command.deadlineAt !== undefined && now > command.deadlineAt) {
loggerWithContext.warn('Command is too late...');
await this._update(command, {
status: COMMAND_STATUS.EXPIRED,
Expand Down Expand Up @@ -280,7 +281,7 @@ class CommandExecutor {
let delay = addDelay ?? 0;

if (delay > MAX_COMMAND_DELAY_IN_MILLS) {
if (command.readyAt == null) {
if (command.readyAt === undefined) {
command.readyAt = Date.now();
}
command.readyAt += delay;
Expand All @@ -290,16 +291,19 @@ class CommandExecutor {
if (insert) {
command = await this._insert(command);
}

const commandPriority = command.priority ?? DEFAULT_COMMAND_PRIORITY;

if (delay) {
setTimeout(
(timeoutCommand) => {
this.queue.push(timeoutCommand);
this.queue.push(timeoutCommand, commandPriority);
},
delay,
command,
);
} else {
this.queue.push(command);
this.queue.push(command, commandPriority);
}
}

Expand All @@ -311,7 +315,7 @@ class CommandExecutor {
*/
async _handleRetry(retryCommand, handler) {
const command = retryCommand;
if (command.retries > 1) {
if (command.retries !== undefined && command.retries > 1) {
command.data = handler.pack(command.data);
await this._update(command, {
status: COMMAND_STATUS.PENDING,
Expand All @@ -337,7 +341,7 @@ class CommandExecutor {
* @private
*/
async _handleError(command, handler, error) {
if (command.retries > 0) {
if (command.retries !== undefined && command.retries > 0) {
await this._update(command, {
retries: command.retries - 1,
});
Expand Down Expand Up @@ -368,33 +372,31 @@ class CommandExecutor {
* @private
*/
async _insert(insertCommand, transaction = null) {
const command = insertCommand;
if (!command.name) {
[command.name] = command.sequence;
command.sequence = command.sequence.slice(1);
}
if (!command.readyAt) {
command.readyAt = Date.now(); // take current time
}
if (command.delay == null) {
command.delay = 0;
}
if (!command.transactional) {
command.transactional = 0;
}
if (!command.data) {
const { sequence, name, readyAt, delay, transactional, data, priority } = insertCommand;

const command = {
...insertCommand,
name: name || sequence?.[0],
sequence: name ? sequence : sequence?.slice(1),
readyAt: readyAt ?? Date.now(),
delay: delay ?? 0,
transactional: transactional ?? 0,
priority: priority ?? DEFAULT_COMMAND_PRIORITY,
status: COMMAND_STATUS.PENDING,
};

if (!data) {
const commandInstance = this.commandResolver.resolve(command.name);
if (commandInstance) {
command.data = commandInstance.pack(command.data);
}
}
command.status = COMMAND_STATUS.PENDING;
const opts = {};
if (transaction != null) {
opts.transaction = transaction;
}

const opts = transaction ? { transaction } : {};
const model = await this.repositoryModuleManager.createCommand(command, opts);

command.id = model.id;

return command;
}

Expand Down Expand Up @@ -462,6 +464,7 @@ class CommandExecutor {
id: commandModel.id,
name: commandModel.name,
data: commandModel.data,
priority: commandModel.priority ?? DEFAULT_COMMAND_PRIORITY,
readyAt: commandModel.readyAt,
delay: commandModel.delay,
startedAt: commandModel.startedAt,
Expand Down
21 changes: 17 additions & 4 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ export const BLS_KEY_FILENAME = 'secretKey';

export const TRIPLE_STORE_CONNECT_MAX_RETRIES = 10;

export const COMMAND_PRIORITY = {
HIGHEST: 0,
HIGH: 1,
MEDIUM: 5,
LOW: 10,
LOWEST: 20,
};

export const DEFAULT_COMMAND_PRIORITY = COMMAND_PRIORITY.MEDIUM;

export const DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS = 15 * 24 * 60 * 60 * 1000; // 15 days

export const MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS = 60 * 60 * 1000; // 1 hour
Expand Down Expand Up @@ -211,18 +221,21 @@ export const DEFAULT_COMMAND_REPEAT_INTERVAL_IN_MILLS = 5000; // 5 seconds
export const DEFAULT_COMMAND_DELAY_IN_MILLS = 60 * 1000; // 60 seconds

export const TRANSACTION_PRIORITY = {
HIGHEST: 0,
HIGH: 1,
REGULAR: 2,
MEDIUM: 5,
LOW: 10,
LOWEST: 20,
};

export const CONTRACT_FUNCTION_PRIORITY = {
'submitCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))':
TRANSACTION_PRIORITY.REGULAR,
'submitCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.REGULAR,
TRANSACTION_PRIORITY.MEDIUM,
'submitCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.MEDIUM,
'submitUpdateCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))':
TRANSACTION_PRIORITY.HIGH,
'submitUpdateCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.HIGH,
sendProof: TRANSACTION_PRIORITY.REGULAR,
sendProof: TRANSACTION_PRIORITY.MEDIUM,
};

export const COMMAND_RETRIES = {
Expand Down
39 changes: 12 additions & 27 deletions src/modules/blockchain/implementation/web3-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Web3Service {
initializeTransactionQueues(concurrency = TRANSACTION_QUEUE_CONCURRENCY) {
this.transactionQueues = {};
for (const operationalWallet of this.operationalWallets) {
const transactionQueue = async.queue((args, cb) => {
const transactionQueue = async.priorityQueue((args, cb) => {
const { contractInstance, functionName, transactionArgs, gasPrice } = args;
this._executeContractFunction(
contractInstance,
Expand All @@ -104,33 +104,18 @@ class Web3Service {

queueTransaction(contractInstance, functionName, transactionArgs, callback, gasPrice) {
const selectedQueue = this.selectTransactionQueue();
const priority = CONTRACT_FUNCTION_PRIORITY[functionName] ?? TRANSACTION_PRIORITY.REGULAR;
const priority = CONTRACT_FUNCTION_PRIORITY[functionName] ?? TRANSACTION_PRIORITY.MEDIUM;
this.logger.info(`Calling ${functionName} with priority: ${priority}`);
switch (priority) {
case TRANSACTION_PRIORITY.HIGH:
selectedQueue.unshift(
{
contractInstance,
functionName,
transactionArgs,
gasPrice,
},
callback,
);
break;
case TRANSACTION_PRIORITY.REGULAR:
default:
selectedQueue.push(
{
contractInstance,
functionName,
transactionArgs,
gasPrice,
},
callback,
);
break;
}
selectedQueue.push(
{
contractInstance,
functionName,
transactionArgs,
gasPrice,
},
priority,
callback,
);
}

removeTransactionQueue(walletAddress) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export async function up({ context: { queryInterface, Sequelize } }) {
await queryInterface.addColumn('commands', 'priority', {
type: Sequelize.BIGINT,
});
}

export async function down({ context: { queryInterface } }) {
await queryInterface.removeColumn('commands', 'priority');
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ export default (sequelize, DataTypes) => {
},
name: DataTypes.STRING,
data: DataTypes.JSON,
priority: DataTypes.BIGINT,
sequence: DataTypes.JSON,
readyAt: DataTypes.BIGINT,
delay: DataTypes.BIGINT,
startedAt: DataTypes.BIGINT,
deadlineAt: DataTypes.BIGINT,
period: DataTypes.INTEGER,
period: DataTypes.BIGINT,
status: DataTypes.STRING,
message: DataTypes.TEXT,
parentId: DataTypes.UUID,
Expand Down

0 comments on commit 3804787

Please sign in to comment.