Skip to content

Commit

Permalink
Updated command executor pause/resume logic
Browse files Browse the repository at this point in the history
  • Loading branch information
djordjekovac committed Sep 21, 2023
1 parent 0de468f commit a065dd7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
23 changes: 19 additions & 4 deletions ot-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ class OTNode {

await this.createProfiles();

await this.initializeCommandExecutor();
await this.initializeShardingTableService();
await this.initializeTelemetryInjectionService();
await this.initializeBlockchainEventListenerService();

await this.initializeCommandExecutor();
await this.initializeRouters();
await this.startNetworkModule();
this.resumeCommandExecutor();
this.logger.info('Node is up and running!');
}

Expand Down Expand Up @@ -244,9 +245,11 @@ class OTNode {
async initializeCommandExecutor() {
try {
const commandExecutor = this.container.resolve('commandExecutor');
await commandExecutor.init();
commandExecutor.replay();
await commandExecutor.start();
commandExecutor.pauseQueue();
await commandExecutor.addDefaultCommands();
commandExecutor
.replayOldCommands()
.then(() => this.logger.info('Finished replaying old commands'));
} catch (e) {
this.logger.error(
`Command executor initialization failed. Error message: ${e.message}`,
Expand All @@ -255,6 +258,18 @@ class OTNode {
}
}

resumeCommandExecutor() {
try {
const commandExecutor = this.container.resolve('commandExecutor');
commandExecutor.resumeQueue();
} catch (e) {
this.logger.error(
`Unable to resume command executor queue. Error message: ${e.message}`,
);
this.stop(1);
}
}

async startNetworkModule() {
const networkModuleManager = this.container.resolve('networkModuleManager');
await networkModuleManager.start();
Expand Down
28 changes: 18 additions & 10 deletions src/commands/command-executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class CommandExecutor {
constructor(ctx) {
this.logger = ctx.logger;
this.commandResolver = ctx.commandResolver;
this.started = false;

this.repositoryModuleManager = ctx.repositoryModuleManager;
this.verboseLoggingEnabled = ctx.config.commandExecutorVerboseLoggingEnabled;
Expand All @@ -39,23 +38,32 @@ class CommandExecutor {
* Initialize executor
* @returns {Promise<void>}
*/
async init() {
await Promise.all(PERMANENT_COMMANDS.map((command) => this._startDefaultCommand(command)));
async addDefaultCommands() {
await Promise.all(PERMANENT_COMMANDS.map((command) => this._addDefaultCommand(command)));

if (this.verboseLoggingEnabled) {
this.logger.trace('Command executor has been initialized...');
}
}

/**
* Starts the command executor
* @return {Promise<void>}
* Resumes the command executor queue
*/
resumeQueue() {
if (this.verboseLoggingEnabled) {
this.logger.trace('Command executor queue has been resumed...');
}
this.queue.resume();
}

/**
* Pause the command executor queue
*/
async start() {
this.started = true;
pauseQueue() {
if (this.verboseLoggingEnabled) {
this.logger.trace('Command executor has been started...');
this.logger.trace('Command executor queue has been paused...');
}
this.queue.pause();
}

/**
Expand Down Expand Up @@ -224,7 +232,7 @@ class CommandExecutor {
* @return {Promise<void>}
* @private
*/
async _startDefaultCommand(name) {
async _addDefaultCommand(name) {
await this._delete(name);
const handler = this.commandResolver.resolve(name);
if (!handler) {
Expand Down Expand Up @@ -398,7 +406,7 @@ class CommandExecutor {
* Replays pending commands from the database
* @returns {Promise<void>}
*/
async replay() {
async replayOldCommands() {
this.logger.info('Replay pending/started commands from the database...');
const pendingCommands = await this.repositoryModuleManager.getCommandsWithStatus(
[COMMAND_STATUS.PENDING, COMMAND_STATUS.STARTED, COMMAND_STATUS.REPEATING],
Expand Down

0 comments on commit a065dd7

Please sign in to comment.