From a065dd7139c42bd803db4a6a3461b4a790029f4a Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Thu, 21 Sep 2023 10:55:43 +0200 Subject: [PATCH] Updated command executor pause/resume logic --- ot-node.js | 23 +++++++++++++++++++---- src/commands/command-executor.js | 28 ++++++++++++++++++---------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/ot-node.js b/ot-node.js index 5e89a6b99f..fa68cfc2af 100644 --- a/ot-node.js +++ b/ot-node.js @@ -63,13 +63,14 @@ class OTNode { await this.createProfiles(); + await this.initializeCommandExecutor(); await this.initializeShardingTableService(); await this.initializeTelemetryInjectionService(); await this.initializeBlockchainEventListenerService(); - await this.initializeCommandExecutor(); await this.initializeRouters(); await this.startNetworkModule(); + this.resumeCommandExecutor(); this.logger.info('Node is up and running!'); } @@ -244,9 +245,11 @@ class OTNode { async initializeCommandExecutor() { try { const commandExecutor = this.container.resolve('commandExecutor'); - await commandExecutor.init(); - commandExecutor.replay(); - await commandExecutor.start(); + commandExecutor.pauseQueue(); + await commandExecutor.addDefaultCommands(); + commandExecutor + .replayOldCommands() + .then(() => this.logger.info('Finished replaying old commands')); } catch (e) { this.logger.error( `Command executor initialization failed. Error message: ${e.message}`, @@ -255,6 +258,18 @@ class OTNode { } } + resumeCommandExecutor() { + try { + const commandExecutor = this.container.resolve('commandExecutor'); + commandExecutor.resumeQueue(); + } catch (e) { + this.logger.error( + `Unable to resume command executor queue. Error message: ${e.message}`, + ); + this.stop(1); + } + } + async startNetworkModule() { const networkModuleManager = this.container.resolve('networkModuleManager'); await networkModuleManager.start(); diff --git a/src/commands/command-executor.js b/src/commands/command-executor.js index e62e042acb..9f2a85933a 100644 --- a/src/commands/command-executor.js +++ b/src/commands/command-executor.js @@ -16,7 +16,6 @@ class CommandExecutor { constructor(ctx) { this.logger = ctx.logger; this.commandResolver = ctx.commandResolver; - this.started = false; this.repositoryModuleManager = ctx.repositoryModuleManager; this.verboseLoggingEnabled = ctx.config.commandExecutorVerboseLoggingEnabled; @@ -39,8 +38,8 @@ class CommandExecutor { * Initialize executor * @returns {Promise} */ - async init() { - await Promise.all(PERMANENT_COMMANDS.map((command) => this._startDefaultCommand(command))); + async addDefaultCommands() { + await Promise.all(PERMANENT_COMMANDS.map((command) => this._addDefaultCommand(command))); if (this.verboseLoggingEnabled) { this.logger.trace('Command executor has been initialized...'); @@ -48,14 +47,23 @@ class CommandExecutor { } /** - * Starts the command executor - * @return {Promise} + * Resumes the command executor queue + */ + resumeQueue() { + if (this.verboseLoggingEnabled) { + this.logger.trace('Command executor queue has been resumed...'); + } + this.queue.resume(); + } + + /** + * Pause the command executor queue */ - async start() { - this.started = true; + pauseQueue() { if (this.verboseLoggingEnabled) { - this.logger.trace('Command executor has been started...'); + this.logger.trace('Command executor queue has been paused...'); } + this.queue.pause(); } /** @@ -224,7 +232,7 @@ class CommandExecutor { * @return {Promise} * @private */ - async _startDefaultCommand(name) { + async _addDefaultCommand(name) { await this._delete(name); const handler = this.commandResolver.resolve(name); if (!handler) { @@ -398,7 +406,7 @@ class CommandExecutor { * Replays pending commands from the database * @returns {Promise} */ - async replay() { + async replayOldCommands() { this.logger.info('Replay pending/started commands from the database...'); const pendingCommands = await this.repositoryModuleManager.getCommandsWithStatus( [COMMAND_STATUS.PENDING, COMMAND_STATUS.STARTED, COMMAND_STATUS.REPEATING],