diff --git a/src/executor/executor.ts b/src/executor/executor.ts index 8d6c75f3f..4f5a7c728 100644 --- a/src/executor/executor.ts +++ b/src/executor/executor.ts @@ -54,6 +54,15 @@ export type ExecutorOptions = { * For more details see {@link JobStorage}. Defaults to a simple in-memory storage. */ jobStorage?: JobStorage; + /** + * Do not install signal handlers for SIGINT, SIGTERM, SIGBREAK, SIGHUP. + * + * By default, TaskExecutor will install those and terminate itself when any of those signals is received. + * This is to make sure proper shutdown with completed invoice payments. + * + * Note: If you decide to set this to `true`, you will be responsible for proper shutdown of task executor. + */ + skipProcessSignals?: boolean; /** * Timeout for waiting for at least one offer from the market. * This parameter (set to 30 sec by default) will throw an error when executing `TaskExecutor.run` @@ -61,7 +70,7 @@ export type ExecutorOptions = { * You can set a slightly higher time in a situation where your parameters such as proposalFilter * or minimum hardware requirements are quite restrictive and finding a suitable provider * that meets these criteria may take a bit longer. - * */ + */ startupTimeout?: number; } & Omit & MarketOptions & @@ -103,6 +112,8 @@ export class TaskExecutor { private startupTimeoutId?: NodeJS.Timeout; private yagna: Yagna; + private signalHandler = (signal: string) => this.cancel(signal); + /** * Create a new Task Executor * @description Factory Method that create and initialize an instance of the TaskExecutor @@ -224,7 +235,7 @@ export class TaskExecutor { this.storageProvider?.init(), ]).catch((e) => this.handleCriticalError(e)); this.taskService.run().catch((e) => this.handleCriticalError(e)); - if (runtimeContextChecker.isNode) this.handleCancelEvent(); + if (runtimeContextChecker.isNode) this.installSignalHandlers(); this.options.eventTarget.dispatchEvent(new Events.ComputationStarted()); this.logger?.info( `Task Executor has started using subnet: ${this.options.subnetTag}, network: ${this.paymentService.config.payment.network}, driver: ${this.paymentService.config.payment.driver}`, @@ -235,7 +246,7 @@ export class TaskExecutor { * Stop all executor services and shut down executor instance */ async end() { - if (runtimeContextChecker.isNode) this.removeCancelEvent(); + if (runtimeContextChecker.isNode) this.removeSignalHandlers(); if (!this.isRunning) return; this.isRunning = false; clearTimeout(this.startupTimeoutId); @@ -462,18 +473,24 @@ export class TaskExecutor { this.end().catch((e) => this.logger?.error(e)); } - private handleCancelEvent() { - terminatingSignals.forEach((event) => process.on(event, () => this.cancel(event))); + private installSignalHandlers() { + if (this.configOptions.skipProcessSignals) return; + terminatingSignals.forEach((event) => { + process.on(event, this.signalHandler); + }); } - private removeCancelEvent() { - terminatingSignals.forEach((event) => process.removeAllListeners(event)); + private removeSignalHandlers() { + if (this.configOptions.skipProcessSignals) return; + terminatingSignals.forEach((event) => { + process.removeListener(event, this.signalHandler); + }); } public async cancel(reason?: string) { try { if (this.isCanceled) return; - if (runtimeContextChecker.isNode) this.removeCancelEvent(); + if (runtimeContextChecker.isNode) this.removeSignalHandlers(); const message = `Executor has interrupted by the user. Reason: ${reason}.`; this.logger?.warn(`${message}. Stopping all tasks...`); this.isCanceled = true;