Skip to content

Commit

Permalink
Merge pull request #619 from golemfactory/feature/JST-516
Browse files Browse the repository at this point in the history
Feature/jst 516 - option to disable signals
  • Loading branch information
pgrzy-golem authored Oct 19, 2023
2 parents 17d90fa + 59281af commit b0b8b8b
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,23 @@ export type ExecutorOptions = {
* For more details see {@link JobStorage}. Defaults to a simple in-memory storage.
*/
jobStorage?: JobStorage;
/**
* Do not install signal handlers for SIGINT, SIGTERM, SIGBREAK, SIGHUP.
*
* By default, TaskExecutor will install those and terminate itself when any of those signals is received.
* This is to make sure proper shutdown with completed invoice payments.
*
* Note: If you decide to set this to `true`, you will be responsible for proper shutdown of task executor.
*/
skipProcessSignals?: boolean;
/**
* Timeout for waiting for at least one offer from the market.
* This parameter (set to 30 sec by default) will throw an error when executing `TaskExecutor.run`
* if no offer from the market is accepted before this time.
* You can set a slightly higher time in a situation where your parameters such as proposalFilter
* or minimum hardware requirements are quite restrictive and finding a suitable provider
* that meets these criteria may take a bit longer.
* */
*/
startupTimeout?: number;
} & Omit<PackageOptions, "imageHash" | "imageTag"> &
MarketOptions &
Expand Down Expand Up @@ -103,6 +112,8 @@ export class TaskExecutor {
private startupTimeoutId?: NodeJS.Timeout;
private yagna: Yagna;

private signalHandler = (signal: string) => this.cancel(signal);

/**
* Create a new Task Executor
* @description Factory Method that create and initialize an instance of the TaskExecutor
Expand Down Expand Up @@ -224,7 +235,7 @@ export class TaskExecutor {
this.storageProvider?.init(),
]).catch((e) => this.handleCriticalError(e));
this.taskService.run().catch((e) => this.handleCriticalError(e));
if (runtimeContextChecker.isNode) this.handleCancelEvent();
if (runtimeContextChecker.isNode) this.installSignalHandlers();
this.options.eventTarget.dispatchEvent(new Events.ComputationStarted());
this.logger?.info(
`Task Executor has started using subnet: ${this.options.subnetTag}, network: ${this.paymentService.config.payment.network}, driver: ${this.paymentService.config.payment.driver}`,
Expand All @@ -235,7 +246,7 @@ export class TaskExecutor {
* Stop all executor services and shut down executor instance
*/
async end() {
if (runtimeContextChecker.isNode) this.removeCancelEvent();
if (runtimeContextChecker.isNode) this.removeSignalHandlers();
if (!this.isRunning) return;
this.isRunning = false;
clearTimeout(this.startupTimeoutId);
Expand Down Expand Up @@ -462,18 +473,24 @@ export class TaskExecutor {
this.end().catch((e) => this.logger?.error(e));
}

private handleCancelEvent() {
terminatingSignals.forEach((event) => process.on(event, () => this.cancel(event)));
private installSignalHandlers() {
if (this.configOptions.skipProcessSignals) return;
terminatingSignals.forEach((event) => {
process.on(event, this.signalHandler);
});
}

private removeCancelEvent() {
terminatingSignals.forEach((event) => process.removeAllListeners(event));
private removeSignalHandlers() {
if (this.configOptions.skipProcessSignals) return;
terminatingSignals.forEach((event) => {
process.removeListener(event, this.signalHandler);
});
}

public async cancel(reason?: string) {
try {
if (this.isCanceled) return;
if (runtimeContextChecker.isNode) this.removeCancelEvent();
if (runtimeContextChecker.isNode) this.removeSignalHandlers();
const message = `Executor has interrupted by the user. Reason: ${reason}.`;
this.logger?.warn(`${message}. Stopping all tasks...`);
this.isCanceled = true;
Expand Down

0 comments on commit b0b8b8b

Please sign in to comment.