From af76abd8f4614841261e7f2eb06c3dad5c7f17ee Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 20 Jun 2024 18:32:28 +0200 Subject: [PATCH 1/5] feat: added `signalOrTimeout` param for `getExeUnit()` and `stopAndFinalize()` in `ResourceRental` --- src/activity/exe-unit/exe-unit.ts | 77 ++++++-------------------- src/resource-rental/resource-rental.ts | 41 ++++++++++++-- src/shared/utils/wait.ts | 32 ++++++++--- tests/e2e/resourceRentalPool.spec.ts | 37 +++++++++++++ 4 files changed, 112 insertions(+), 75 deletions(-) diff --git a/src/activity/exe-unit/exe-unit.ts b/src/activity/exe-unit/exe-unit.ts index c32b528f3..203cc941a 100644 --- a/src/activity/exe-unit/exe-unit.ts +++ b/src/activity/exe-unit/exe-unit.ts @@ -25,10 +25,6 @@ import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; export type LifecycleFunction = (exe: ExeUnit) => Promise; -const DEFAULTS = { - activityDeployingTimeout: 300_000, -}; - export interface ExeUnitOptions { activityDeployingTimeout?: number; storageProvider?: StorageProvider; @@ -60,8 +56,6 @@ export interface ActivityDTO { * Groups most common operations that the requestors might need to implement their workflows */ export class ExeUnit { - private readonly activityDeployingTimeout: number; - public readonly provider: ProviderInfo; private readonly logger: Logger; private readonly storageProvider: StorageProvider; @@ -76,8 +70,6 @@ export class ExeUnit { public readonly activityModule: ActivityModule, private options?: ExeUnitOptions, ) { - this.activityDeployingTimeout = options?.activityDeployingTimeout || DEFAULTS.activityDeployingTimeout; - this.logger = options?.logger ?? defaultLogger("work"); this.provider = activity.provider; this.storageProvider = options?.storageProvider ?? new NullStorageProvider(); @@ -163,60 +155,25 @@ export class ExeUnit { } private async deployActivity() { - const result = await this.executor - .execute( + try { + const result = await this.executor.execute( new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(), - undefined, - this.activityDeployingTimeout, - ) - .catch((e) => { - throw new GolemWorkError( - `Unable to deploy activity. ${e}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - e, - ); - }); - - let timeoutId: NodeJS.Timeout; - - await Promise.race([ - new Promise( - (res, rej) => - (timeoutId = setTimeout( - () => rej(new GolemTimeoutError("Deploing activity has been aborted due to a timeout")), - this.activityDeployingTimeout, - )), - ), - (async () => { - for await (const res of result) { - if (res.result === "Error") - throw new GolemWorkError( - `Deploing activity failed. Error: ${res.message}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - ); - } - })(), - ]) - .catch((error) => { - if (error instanceof GolemWorkError) { - throw error; + ); + for await (const res of result) { + if (res.result === "Error") { + throw new Error(res.message); } - throw new GolemWorkError( - `Deploing activity failed. Error: ${error.toString()}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - error, - ); - }) - .finally(() => clearTimeout(timeoutId)); + } + } catch (error) { + throw new GolemWorkError( + `Unable to deploy activity. ${error}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.provider, + error, + ); + } } private async setupActivity() { diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 4cc63fe55..90f5273be 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -1,6 +1,6 @@ import { Agreement } from "../market/agreement/agreement"; import { AgreementPaymentProcess, PaymentProcessOptions } from "../payment/agreement_payment_process"; -import { Logger } from "../shared/utils"; +import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; import { waitForCondition } from "../shared/utils/wait"; import { ActivityModule, ExeUnit, ExeUnitOptions } from "../activity"; import { StorageProvider } from "../shared/storage"; @@ -8,7 +8,7 @@ import { EventEmitter } from "eventemitter3"; import { NetworkNode } from "../network"; import { ExecutionOptions } from "../activity/exe-script-executor"; import { MarketModule } from "../market"; -import { GolemUserError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemTimeoutError, GolemUserError } from "../shared/error/golem-error"; export interface ResourceRentalEvents { /** @@ -53,8 +53,11 @@ export class ResourceRental { * Terminates the activity and agreement (stopping any ongoing work) and finalizes the payment process. * Resolves when the rental will be fully terminated and all pending business operations finalized. * If the rental is already finalized, it will resolve immediately. + * @param signalOrTimeout - timeout in milliseconds or an AbortSignal that will be used to cancel the finalization process, + * especially the payment process. + * Please note that canceling the payment process may fail to comply with the terms of the agreement. */ - async stopAndFinalize() { + async stopAndFinalize(signalOrTimeout?: number | AbortSignal) { // Prevent this task from being performed more than once if (!this.finalizePromise) { this.finalizePromise = (async () => { @@ -74,7 +77,19 @@ export class ResourceRental { } this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); - await waitForCondition(() => this.paymentProcess.isFinished()); + const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); + await waitForCondition(() => this.paymentProcess.isFinished(), { + signalOrTimeout: abortSignal, + }).catch((error) => { + this.paymentProcess.stop(); + if (error instanceof GolemTimeoutError) { + throw new GolemTimeoutError( + `The finalization of payment process has been aborted due to a timeout (${+signalOrTimeout!}ms)`, + abortSignal.reason, + ); + } + throw new GolemAbortError("The finalization of payment process has been aborted", abortSignal.reason); + }); this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); } catch (error) { this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); @@ -93,8 +108,10 @@ export class ResourceRental { /** * Creates an activity on the Provider, and returns a exe-unit that can be used to operate within the activity + * @param signalOrTimeout - timeout in milliseconds or an AbortSignal that will be used to cancel the exe-unit request, + * especially when the exe-unit is in the process of starting, deploying and preparing the environment (including setup function) */ - async getExeUnit(): Promise { + async getExeUnit(signalOrTimeout?: number | AbortSignal): Promise { if (this.finalizePromise || this.abortController.signal.aborted) { throw new GolemUserError("The resource rental is not active. It may have been aborted or finalized"); } @@ -102,12 +119,24 @@ export class ResourceRental { return this.currentExeUnit; } + const abortController = new AbortController(); + this.abortController.signal.addEventListener("abort", () => + abortController.abort(this.abortController.signal.reason), + ); + if (signalOrTimeout) { + const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); + abortSignal.addEventListener("abort", () => abortController.abort(abortSignal.reason)); + if (signalOrTimeout instanceof AbortSignal && signalOrTimeout.aborted) { + abortController.abort(signalOrTimeout.reason); + } + } + const activity = await this.activityModule.createActivity(this.agreement); this.currentExeUnit = await this.activityModule.createExeUnit(activity, { storageProvider: this.storageProvider, networkNode: this.resourceRentalOptions?.networkNode, executionOptions: this.resourceRentalOptions?.activity, - signalOrTimeout: this.abortController.signal, + signalOrTimeout: abortController.signal, ...this.resourceRentalOptions?.exeUnit, }); diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index 5782b60be..680233b39 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -1,39 +1,53 @@ -import { GolemTimeoutError } from "../error/golem-error"; +import { GolemAbortError, GolemTimeoutError } from "../error/golem-error"; /** * Utility function that helps to block the execution until a condition is met (check returns true) or the timeout happens. * * @param {function} check - The function checking if the condition is met. * @param {Object} [opts] - Options controlling the timeout and check interval in seconds. - * @param {number} [opts.timeoutSeconds=30] - The timeout value in seconds. + * @param {number} [opts.signalOrTimeout=30] - The timeout value in seconds or AbortSignal. * @param {number} [opts.intervalSeconds=1] - The interval between condition checks in seconds. * * @return {Promise} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time. */ export function waitForCondition( check: () => boolean | Promise, - opts = { timeoutSeconds: 30, intervalSeconds: 1 }, + opts?: { signalOrTimeout?: number | AbortSignal; intervalSeconds?: number }, ): Promise { + const DEFAULT_TIMEOUT_SECONDS = 30; + const abortSignal = + opts?.signalOrTimeout instanceof AbortSignal + ? opts.signalOrTimeout + : AbortSignal.timeout( + typeof opts?.signalOrTimeout === "number" ? opts.signalOrTimeout * 1000 : DEFAULT_TIMEOUT_SECONDS * 1000, + ); + const intervalSeconds = opts?.intervalSeconds ?? 1; let verifyInterval: NodeJS.Timeout | undefined; - let waitTimeout: NodeJS.Timeout | undefined; const verify = new Promise((resolve) => { verifyInterval = setInterval(async () => { if (await check()) { resolve(); } - }, opts.intervalSeconds * 1000); + }, intervalSeconds * 1000); }); const wait = new Promise((_, reject) => { - waitTimeout = setTimeout(() => { - reject(new GolemTimeoutError(`Condition was not met within ${opts.timeoutSeconds}s`)); - }, opts.timeoutSeconds * 1000); + const abortError = new GolemAbortError("Waiting for a condition has been aborted", abortSignal.reason); + if (abortSignal.aborted) { + return reject(abortError); + } + abortSignal.addEventListener("abort", () => { + reject( + abortSignal.reason.name === "TimeoutError" + ? new GolemTimeoutError(`Waiting for a condition has been aborted due to a timeout`, abortSignal.reason) + : abortError, + ); + }); }); return Promise.race([verify, wait]).finally(() => { clearInterval(verifyInterval); - clearTimeout(waitTimeout); }); } diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index b1004b7d8..7ab464c4c 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -1,5 +1,6 @@ import { Subscription } from "rxjs"; import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; +import { aborted } from "node:util"; describe("ResourceRentalPool", () => { const glm = new GolemNetwork(); @@ -202,4 +203,40 @@ describe("ResourceRentalPool", () => { ); }); }); + + it("should abort getting exe-unit by timeout", async () => { + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); + const rental = await pool.acquire(); + await expect(rental.getExeUnit(10)).rejects.toThrow( + new GolemAbortError("Initializing of the exe-unit has been aborted due to a timeout"), + ); + }); + + it("should abort getting exe-unit by signal", async () => { + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); + const abortController = new AbortController(); + const rental = await pool.acquire(); + abortController.abort(); + await expect(rental.getExeUnit(abortController.signal)).rejects.toThrow( + new GolemAbortError("Initializing of the exe-unit has been aborted"), + ); + }); + + it("should abort finalizing resource rental by timeout", async () => { + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); + const rental = await pool.acquire(); + await expect(rental.stopAndFinalize(10)).rejects.toThrow( + new GolemAbortError("The finalization of payment process has been aborted due to a timeout (10ms)"), + ); + }); + + it("should abort finalizing resource rental by signal", async () => { + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); + const abortController = new AbortController(); + const rental = await pool.acquire(); + abortController.abort(); + await expect(rental.stopAndFinalize(abortController.signal)).rejects.toThrow( + new GolemAbortError("The finalization of payment process has been aborted"), + ); + }); }); From 8359b266313f83ec4285ececc85c9b48ef48cbb9 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 20 Jun 2024 18:40:04 +0200 Subject: [PATCH 2/5] chore: fixed typo --- src/resource-rental/resource-rental.ts | 3 +-- src/shared/utils/wait.ts | 6 +++--- tests/e2e/resourceRentalPool.spec.ts | 1 - 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 90f5273be..248ed9c5f 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -53,8 +53,7 @@ export class ResourceRental { * Terminates the activity and agreement (stopping any ongoing work) and finalizes the payment process. * Resolves when the rental will be fully terminated and all pending business operations finalized. * If the rental is already finalized, it will resolve immediately. - * @param signalOrTimeout - timeout in milliseconds or an AbortSignal that will be used to cancel the finalization process, - * especially the payment process. + * @param signalOrTimeout - timeout in milliseconds or an AbortSignal that will be used to cancel the finalization process, especially the payment process. * Please note that canceling the payment process may fail to comply with the terms of the agreement. */ async stopAndFinalize(signalOrTimeout?: number | AbortSignal) { diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index 680233b39..a8f9e5152 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -37,13 +37,13 @@ export function waitForCondition( if (abortSignal.aborted) { return reject(abortError); } - abortSignal.addEventListener("abort", () => { + abortSignal.addEventListener("abort", () => reject( abortSignal.reason.name === "TimeoutError" ? new GolemTimeoutError(`Waiting for a condition has been aborted due to a timeout`, abortSignal.reason) : abortError, - ); - }); + ), + ); }); return Promise.race([verify, wait]).finally(() => { diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index 7ab464c4c..ab782e9bc 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -1,6 +1,5 @@ import { Subscription } from "rxjs"; import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; -import { aborted } from "node:util"; describe("ResourceRentalPool", () => { const glm = new GolemNetwork(); From 528e0f5687509a47f5cfd99288ab47cf43b3d3b8 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Fri, 21 Jun 2024 08:34:54 +0200 Subject: [PATCH 3/5] chore: cleanup code --- src/resource-rental/resource-rental.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 248ed9c5f..6c05d035e 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -83,7 +83,7 @@ export class ResourceRental { this.paymentProcess.stop(); if (error instanceof GolemTimeoutError) { throw new GolemTimeoutError( - `The finalization of payment process has been aborted due to a timeout (${+signalOrTimeout!}ms)`, + `The finalization of payment process has been aborted due to a timeout (${signalOrTimeout}ms)`, abortSignal.reason, ); } From 33eb509e23a4f81c46aba12b96c10f2e918e188f Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Fri, 21 Jun 2024 12:06:56 +0200 Subject: [PATCH 4/5] refactor: removed timeout value from logs --- src/resource-rental/resource-rental.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 6c05d035e..3f80c1901 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -83,7 +83,7 @@ export class ResourceRental { this.paymentProcess.stop(); if (error instanceof GolemTimeoutError) { throw new GolemTimeoutError( - `The finalization of payment process has been aborted due to a timeout (${signalOrTimeout}ms)`, + `The finalization of payment process has been aborted due to a timeout`, abortSignal.reason, ); } From 8113119d2ce76b478ffec7cf08f449f9e2aa740a Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Fri, 21 Jun 2024 12:19:31 +0200 Subject: [PATCH 5/5] refactor: changed signalOrTimeout in waitForCondition from sec to ms --- src/shared/utils/wait.ts | 11 +++-------- tests/e2e/resourceRentalPool.spec.ts | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index a8f9e5152..2fe8bb636 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -1,11 +1,12 @@ import { GolemAbortError, GolemTimeoutError } from "../error/golem-error"; +import { createAbortSignalFromTimeout } from "./abortSignal"; /** * Utility function that helps to block the execution until a condition is met (check returns true) or the timeout happens. * * @param {function} check - The function checking if the condition is met. * @param {Object} [opts] - Options controlling the timeout and check interval in seconds. - * @param {number} [opts.signalOrTimeout=30] - The timeout value in seconds or AbortSignal. + * @param {number} [opts.signalOrTimeout=30_000] - The timeout value in miliseconds or AbortSignal. * @param {number} [opts.intervalSeconds=1] - The interval between condition checks in seconds. * * @return {Promise} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time. @@ -14,13 +15,7 @@ export function waitForCondition( check: () => boolean | Promise, opts?: { signalOrTimeout?: number | AbortSignal; intervalSeconds?: number }, ): Promise { - const DEFAULT_TIMEOUT_SECONDS = 30; - const abortSignal = - opts?.signalOrTimeout instanceof AbortSignal - ? opts.signalOrTimeout - : AbortSignal.timeout( - typeof opts?.signalOrTimeout === "number" ? opts.signalOrTimeout * 1000 : DEFAULT_TIMEOUT_SECONDS * 1000, - ); + const abortSignal = createAbortSignalFromTimeout(opts?.signalOrTimeout ?? 30_000); const intervalSeconds = opts?.intervalSeconds ?? 1; let verifyInterval: NodeJS.Timeout | undefined; diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index ab782e9bc..67d4ecab2 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -225,7 +225,7 @@ describe("ResourceRentalPool", () => { const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); const rental = await pool.acquire(); await expect(rental.stopAndFinalize(10)).rejects.toThrow( - new GolemAbortError("The finalization of payment process has been aborted due to a timeout (10ms)"), + new GolemAbortError("The finalization of payment process has been aborted due to a timeout"), ); });