Skip to content

Commit

Permalink
Merge pull request #990 from golemfactory/feature/JST-999/signal-or-t…
Browse files Browse the repository at this point in the history
…imeot-in-rental

SignalOrTimeout param for ResourceRental
  • Loading branch information
mgordel authored Jun 21, 2024
2 parents c769a33 + 8113119 commit 570126b
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 75 deletions.
77 changes: 17 additions & 60 deletions src/activity/exe-unit/exe-unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor";

export type LifecycleFunction = (exe: ExeUnit) => Promise<void>;

const DEFAULTS = {
activityDeployingTimeout: 300_000,
};

export interface ExeUnitOptions {
activityDeployingTimeout?: number;
storageProvider?: StorageProvider;
Expand Down Expand Up @@ -60,8 +56,6 @@ export interface ActivityDTO {
* Groups most common operations that the requestors might need to implement their workflows
*/
export class ExeUnit {
private readonly activityDeployingTimeout: number;

public readonly provider: ProviderInfo;
private readonly logger: Logger;
private readonly storageProvider: StorageProvider;
Expand All @@ -76,8 +70,6 @@ export class ExeUnit {
public readonly activityModule: ActivityModule,
private options?: ExeUnitOptions,
) {
this.activityDeployingTimeout = options?.activityDeployingTimeout || DEFAULTS.activityDeployingTimeout;

this.logger = options?.logger ?? defaultLogger("work");
this.provider = activity.provider;
this.storageProvider = options?.storageProvider ?? new NullStorageProvider();
Expand Down Expand Up @@ -163,60 +155,25 @@ export class ExeUnit {
}

private async deployActivity() {
const result = await this.executor
.execute(
try {
const result = await this.executor.execute(
new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(),
undefined,
this.activityDeployingTimeout,
)
.catch((e) => {
throw new GolemWorkError(
`Unable to deploy activity. ${e}`,
WorkErrorCode.ActivityDeploymentFailed,
this.activity.agreement,
this.activity,
this.activity.provider,
e,
);
});

let timeoutId: NodeJS.Timeout;

await Promise.race([
new Promise(
(res, rej) =>
(timeoutId = setTimeout(
() => rej(new GolemTimeoutError("Deploing activity has been aborted due to a timeout")),
this.activityDeployingTimeout,
)),
),
(async () => {
for await (const res of result) {
if (res.result === "Error")
throw new GolemWorkError(
`Deploing activity failed. Error: ${res.message}`,
WorkErrorCode.ActivityDeploymentFailed,
this.activity.agreement,
this.activity,
this.activity.provider,
);
}
})(),
])
.catch((error) => {
if (error instanceof GolemWorkError) {
throw error;
);
for await (const res of result) {
if (res.result === "Error") {
throw new Error(res.message);
}
throw new GolemWorkError(
`Deploing activity failed. Error: ${error.toString()}`,
WorkErrorCode.ActivityDeploymentFailed,
this.activity.agreement,
this.activity,
this.activity.provider,
error,
);
})
.finally(() => clearTimeout(timeoutId));
}
} catch (error) {
throw new GolemWorkError(
`Unable to deploy activity. ${error}`,
WorkErrorCode.ActivityDeploymentFailed,
this.activity.agreement,
this.activity,
this.activity.provider,
error,
);
}
}

private async setupActivity() {
Expand Down
40 changes: 34 additions & 6 deletions src/resource-rental/resource-rental.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Agreement } from "../market/agreement/agreement";
import { AgreementPaymentProcess, PaymentProcessOptions } from "../payment/agreement_payment_process";
import { Logger } from "../shared/utils";
import { createAbortSignalFromTimeout, Logger } from "../shared/utils";
import { waitForCondition } from "../shared/utils/wait";
import { ActivityModule, ExeUnit, ExeUnitOptions } from "../activity";
import { StorageProvider } from "../shared/storage";
import { EventEmitter } from "eventemitter3";
import { NetworkNode } from "../network";
import { ExecutionOptions } from "../activity/exe-script-executor";
import { MarketModule } from "../market";
import { GolemUserError } from "../shared/error/golem-error";
import { GolemAbortError, GolemTimeoutError, GolemUserError } from "../shared/error/golem-error";

export interface ResourceRentalEvents {
/**
Expand Down Expand Up @@ -53,8 +53,10 @@ export class ResourceRental {
* Terminates the activity and agreement (stopping any ongoing work) and finalizes the payment process.
* Resolves when the rental will be fully terminated and all pending business operations finalized.
* If the rental is already finalized, it will resolve immediately.
* @param signalOrTimeout - timeout in milliseconds or an AbortSignal that will be used to cancel the finalization process, especially the payment process.
* Please note that canceling the payment process may fail to comply with the terms of the agreement.
*/
async stopAndFinalize() {
async stopAndFinalize(signalOrTimeout?: number | AbortSignal) {
// Prevent this task from being performed more than once
if (!this.finalizePromise) {
this.finalizePromise = (async () => {
Expand All @@ -74,7 +76,19 @@ export class ResourceRental {
}

this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id });
await waitForCondition(() => this.paymentProcess.isFinished());
const abortSignal = createAbortSignalFromTimeout(signalOrTimeout);
await waitForCondition(() => this.paymentProcess.isFinished(), {
signalOrTimeout: abortSignal,
}).catch((error) => {
this.paymentProcess.stop();
if (error instanceof GolemTimeoutError) {
throw new GolemTimeoutError(
`The finalization of payment process has been aborted due to a timeout`,
abortSignal.reason,
);
}
throw new GolemAbortError("The finalization of payment process has been aborted", abortSignal.reason);
});
this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id });
} catch (error) {
this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error });
Expand All @@ -93,21 +107,35 @@ export class ResourceRental {

/**
* Creates an activity on the Provider, and returns a exe-unit that can be used to operate within the activity
* @param signalOrTimeout - timeout in milliseconds or an AbortSignal that will be used to cancel the exe-unit request,
* especially when the exe-unit is in the process of starting, deploying and preparing the environment (including setup function)
*/
async getExeUnit(): Promise<ExeUnit> {
async getExeUnit(signalOrTimeout?: number | AbortSignal): Promise<ExeUnit> {
if (this.finalizePromise || this.abortController.signal.aborted) {
throw new GolemUserError("The resource rental is not active. It may have been aborted or finalized");
}
if (this.currentExeUnit) {
return this.currentExeUnit;
}

const abortController = new AbortController();
this.abortController.signal.addEventListener("abort", () =>
abortController.abort(this.abortController.signal.reason),
);
if (signalOrTimeout) {
const abortSignal = createAbortSignalFromTimeout(signalOrTimeout);
abortSignal.addEventListener("abort", () => abortController.abort(abortSignal.reason));
if (signalOrTimeout instanceof AbortSignal && signalOrTimeout.aborted) {
abortController.abort(signalOrTimeout.reason);
}
}

const activity = await this.activityModule.createActivity(this.agreement);
this.currentExeUnit = await this.activityModule.createExeUnit(activity, {
storageProvider: this.storageProvider,
networkNode: this.resourceRentalOptions?.networkNode,
executionOptions: this.resourceRentalOptions?.activity,
signalOrTimeout: this.abortController.signal,
signalOrTimeout: abortController.signal,
...this.resourceRentalOptions?.exeUnit,
});

Expand Down
25 changes: 16 additions & 9 deletions src/shared/utils/wait.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { GolemTimeoutError } from "../error/golem-error";
import { GolemAbortError, GolemTimeoutError } from "../error/golem-error";
import { createAbortSignalFromTimeout } from "./abortSignal";

/**
* Utility function that helps to block the execution until a condition is met (check returns true) or the timeout happens.
*
* @param {function} check - The function checking if the condition is met.
* @param {Object} [opts] - Options controlling the timeout and check interval in seconds.
* @param {number} [opts.timeoutSeconds=30] - The timeout value in seconds.
* @param {number} [opts.signalOrTimeout=30_000] - The timeout value in miliseconds or AbortSignal.
* @param {number} [opts.intervalSeconds=1] - The interval between condition checks in seconds.
*
* @return {Promise<void>} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time.
*/
export function waitForCondition(
check: () => boolean | Promise<boolean>,
opts?: { timeoutSeconds?: number; intervalSeconds?: number },
opts?: { signalOrTimeout?: number | AbortSignal; intervalSeconds?: number },
): Promise<void> {
const timeoutSeconds = opts?.timeoutSeconds ?? 30;
const abortSignal = createAbortSignalFromTimeout(opts?.signalOrTimeout ?? 30_000);
const intervalSeconds = opts?.intervalSeconds ?? 1;
let verifyInterval: NodeJS.Timeout | undefined;
let waitTimeout: NodeJS.Timeout | undefined;

const verify = new Promise<void>((resolve) => {
verifyInterval = setInterval(async () => {
Expand All @@ -28,14 +28,21 @@ export function waitForCondition(
});

const wait = new Promise<void>((_, reject) => {
waitTimeout = setTimeout(() => {
reject(new GolemTimeoutError(`Condition was not met within ${timeoutSeconds}s`));
}, timeoutSeconds * 1000);
const abortError = new GolemAbortError("Waiting for a condition has been aborted", abortSignal.reason);
if (abortSignal.aborted) {
return reject(abortError);
}
abortSignal.addEventListener("abort", () =>
reject(
abortSignal.reason.name === "TimeoutError"
? new GolemTimeoutError(`Waiting for a condition has been aborted due to a timeout`, abortSignal.reason)
: abortError,
),
);
});

return Promise.race([verify, wait]).finally(() => {
clearInterval(verifyInterval);
clearTimeout(waitTimeout);
});
}

Expand Down
36 changes: 36 additions & 0 deletions tests/e2e/resourceRentalPool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,40 @@ describe("ResourceRentalPool", () => {
);
});
});

it("should abort getting exe-unit by timeout", async () => {
const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 });
const rental = await pool.acquire();
await expect(rental.getExeUnit(10)).rejects.toThrow(
new GolemAbortError("Initializing of the exe-unit has been aborted due to a timeout"),
);
});

it("should abort getting exe-unit by signal", async () => {
const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 });
const abortController = new AbortController();
const rental = await pool.acquire();
abortController.abort();
await expect(rental.getExeUnit(abortController.signal)).rejects.toThrow(
new GolemAbortError("Initializing of the exe-unit has been aborted"),
);
});

it("should abort finalizing resource rental by timeout", async () => {
const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 });
const rental = await pool.acquire();
await expect(rental.stopAndFinalize(10)).rejects.toThrow(
new GolemAbortError("The finalization of payment process has been aborted due to a timeout"),
);
});

it("should abort finalizing resource rental by signal", async () => {
const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 });
const abortController = new AbortController();
const rental = await pool.acquire();
abortController.abort();
await expect(rental.stopAndFinalize(abortController.signal)).rejects.toThrow(
new GolemAbortError("The finalization of payment process has been aborted"),
);
});
});

0 comments on commit 570126b

Please sign in to comment.