From 8b8f723b685c6111263e2bff29ebb7dae12d569f Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 13 Jun 2024 18:00:27 +0200 Subject: [PATCH 1/9] feat: added ability to cancel lease processes --- src/activity/activity.module.ts | 10 ++- src/activity/exe-script-executor.test.ts | 23 ++++-- src/activity/exe-script-executor.ts | 57 +++++++------ src/activity/work/work.ts | 18 ++++- src/golem-network/golem-network.ts | 13 ++- src/lease-process/lease-process-pool.test.ts | 6 +- src/lease-process/lease-process-pool.ts | 20 +++-- src/lease-process/lease-process.ts | 65 ++++++++++----- src/market/draft-offer-proposal-pool.ts | 84 ++++++++------------ src/market/market.module.test.ts | 17 ++-- src/market/market.module.ts | 25 ++++-- tests/e2e/leaseProcessPool.spec.ts | 45 ++++++++++- 12 files changed, 249 insertions(+), 134 deletions(-) diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index 3b5d9e1e6..a0202ff61 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -7,6 +7,7 @@ import { WorkContext, WorkOptions } from "./work"; import { ExeScriptExecutor, ExeScriptRequest, ExecutionOptions } from "./exe-script-executor"; import { Observable, catchError, tap } from "rxjs"; import { StreamingBatchEvent } from "./results"; +import { GolemAbortError } from "../shared/error/golem-error"; export interface ActivityModule { events: EventEmitter; @@ -123,7 +124,7 @@ export class ActivityModuleImpl implements ActivityModule { } async executeScript(activity: Activity, script: ExeScriptRequest): Promise { - this.logger.info("Executing script on activity", { activityId: activity.id }); + this.logger.debug("Executing script on activity", { activityId: activity.id }); try { const result = await this.activityApi.executeScript(activity, script); this.events.emit("scriptExecuted", activity, script, result); @@ -139,7 +140,7 @@ export class ActivityModuleImpl implements ActivityModule { commandIndex?: number | undefined, timeout?: number | undefined, ): Promise { - this.logger.info("Fetching batch results", { activityId: activity.id, batchId }); + this.logger.debug("Fetching batch results", { activityId: activity.id, batchId }); try { const results = await this.activityApi.getExecBatchResults(activity, batchId, commandIndex, timeout); this.events.emit("batchResultsReceived", activity, batchId, results); @@ -154,7 +155,7 @@ export class ActivityModuleImpl implements ActivityModule { batchId: string, commandIndex?: number | undefined, ): Observable { - this.logger.info("Observing streaming batch events", { activityId: activity.id, batchId }); + this.logger.debug("Observing streaming batch events", { activityId: activity.id, batchId }); return this.activityApi.getExecBatchEvents(activity, batchId, commandIndex).pipe( tap((event) => { this.events.emit("batchEventsReceived", activity, batchId, event); @@ -240,6 +241,9 @@ export class ActivityModuleImpl implements ActivityModule { return ctx; } catch (error) { this.events.emit("errorInitializingActivity", activity, error); + if (options?.signalOrTimeout instanceof AbortSignal && options.signalOrTimeout.aborted) { + throw new GolemAbortError("Initialization of the exe-unit has been aborted", options?.signalOrTimeout.reason); + } throw error; } } diff --git a/src/activity/exe-script-executor.test.ts b/src/activity/exe-script-executor.test.ts index 73cd93550..0db976175 100644 --- a/src/activity/exe-script-executor.test.ts +++ b/src/activity/exe-script-executor.test.ts @@ -4,7 +4,7 @@ import { Capture, Deploy, DownloadFile, Run, Script, Start, Terminate, UploadFil import { buildExeScriptSuccessResult } from "../../tests/unit/helpers"; import { GolemWorkError, WorkErrorCode } from "./work"; import { Logger, sleep } from "../shared/utils"; -import { GolemError, GolemTimeoutError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemError, GolemTimeoutError } from "../shared/error/golem-error"; import { ExeScriptExecutor } from "./exe-script-executor"; import { StorageProvider } from "../shared/storage"; import { from, of, throwError } from "rxjs"; @@ -86,7 +86,6 @@ describe("ExeScriptExecutor", () => { } await script.after([]); - await executor.stop(); }); it("should execute script and get results by events", async () => { @@ -131,7 +130,6 @@ describe("ExeScriptExecutor", () => { }); results.on("end", async () => { await script.after([]); - await executor.stop(); expect(resultCount).toEqual(6); return res(); }); @@ -229,16 +227,17 @@ describe("ExeScriptExecutor", () => { } expect(expectedStdout).toEqual("test"); await script.after([]); - await executor.stop(); }); }); describe("Cancelling", () => { it("should cancel executor", async () => { + const ac = new AbortController(); const executor = new ExeScriptExecutor( instance(mockActivity), instance(mockActivityModule), instance(mockLogger), + { signalOrTimeout: ac.signal }, ); const command1 = new Deploy(); const command2 = new Start(); @@ -249,10 +248,12 @@ describe("ExeScriptExecutor", () => { const script = Script.create([command1, command2, command3, command4, command5, command6]); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), undefined, undefined); - await executor.stop(); + ac.abort(new Error("test reson")); return new Promise((res) => { results.on("error", (error) => { - expect(error.toString()).toMatch(/Error: Activity .* has been interrupted/); + expect(error).toMatchError( + new GolemAbortError(`Processing of batch execution has been aborted`, new Error("test reson")), + ); return res(); }); results.on("data", () => null); @@ -261,10 +262,14 @@ describe("ExeScriptExecutor", () => { it("should cancel executor while streaming batch", async () => { when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(of()); + const ac = new AbortController(); const executor = new ExeScriptExecutor( instance(mockActivity), instance(mockActivityModule), instance(mockLogger), + { + signalOrTimeout: ac.signal, + }, ); const command1 = new Deploy(); const command2 = new Start(); @@ -277,10 +282,12 @@ describe("ExeScriptExecutor", () => { const script = Script.create([command1, command2, command3, command4]); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), true, undefined); - await executor.stop(); + ac.abort(new Error("test reson")); return new Promise((res) => { results.on("error", (error) => { - expect(error.toString()).toMatch(/Error: Activity .* has been interrupted/); + expect(error).toMatchError( + new GolemAbortError(`Processing of batch execution has been aborted`, new Error("test reson")), + ); return res(); }); results.on("data", () => null); diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index dfead36a8..ee4ce71a0 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -1,4 +1,4 @@ -import { Logger } from "../shared/utils"; +import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; import { ExecutionConfig } from "./config"; import { Readable } from "stream"; import { GolemWorkError, WorkErrorCode } from "./work"; @@ -24,13 +24,14 @@ export interface ExecutionOptions { activityExeBatchResultPollIntervalSeconds?: number; /** maximum number of retries retrieving results when an error occurs, default: 10 */ activityExeBatchResultMaxRetries?: number; + signalOrTimeout?: number | AbortSignal; } const RETRYABLE_ERROR_STATUS_CODES = [408, 500]; export class ExeScriptExecutor { - private isRunning = false; private readonly options: ExecutionConfig; + private readonly abortSignal: AbortSignal; constructor( public readonly activity: Activity, @@ -39,13 +40,7 @@ export class ExeScriptExecutor { options?: ExecutionOptions, ) { this.options = new ExecutionConfig(options); - } - - /** - * Stops the executor mid-flight - */ - public stop() { - this.isRunning = false; + this.abortSignal = createAbortSignalFromTimeout(options?.signalOrTimeout); } /** @@ -64,13 +59,14 @@ export class ExeScriptExecutor { ): Promise { let batchId: string, batchSize: number; let startTime = new Date(); - this.isRunning = true; try { + this.abortSignal.throwIfAborted(); batchId = await this.send(script); startTime = new Date(); batchSize = JSON.parse(script.text).length; + this.abortSignal.throwIfAborted(); this.logger.debug(`Script sent.`, { batchId }); return stream @@ -111,17 +107,18 @@ export class ExeScriptExecutor { const { id: activityId, agreement } = this.activity; - const isRunning = () => this.isRunning; - const { activityExecuteTimeout, activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = this.options; - const { logger, activity, activityModule } = this; + const { logger, activity, activityModule, abortSignal } = this; return new Readable({ objectMode: true, async read() { - while (!isBatchFinished) { + abortSignal.addEventListener("abort", () => + this.destroy(new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason)), + ); + while (!isBatchFinished && !abortSignal.aborted) { logger.debug("Polling for batch script execution result"); if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { @@ -134,9 +131,11 @@ export class ExeScriptExecutor { async (bail, attempt) => { logger.debug(`Trying to poll for batch execution results from yagna. Attempt: ${attempt}`); try { - if (!isRunning()) { + if (abortSignal.aborted) { logger.debug("Activity is no longer running, will stop polling for batch execution results"); - return bail(new GolemAbortError(`Activity ${activityId} has been interrupted.`)); + return bail( + new GolemAbortError(`Activity ${activityId} has been interrupted.`, abortSignal.reason), + ); } return await activityModule.getBatchResults( activity, @@ -171,6 +170,11 @@ export class ExeScriptExecutor { }); } } catch (error) { + if (abortSignal.aborted) { + const message = "Processing of batch execution has been aborted"; + logger.warn(message, { reason: abortSignal.reason }); + return this.destroy(new GolemAbortError(message, abortSignal.reason)); + } logger.error(`Processing batch execution results failed`, error); return this.destroy( @@ -187,8 +191,11 @@ export class ExeScriptExecutor { ); } } - - this.push(null); + if (abortSignal.aborted) { + this.destroy(new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason)); + } else { + this.push(null); + } }, }); } @@ -209,10 +216,9 @@ export class ExeScriptExecutor { }); let isBatchFinished = false; - const isRunning = () => this.isRunning; const activityExecuteTimeout = this.options.activityExecuteTimeout; - const { logger, activity } = this; + const { logger, activity, abortSignal } = this; return new Readable({ objectMode: true, @@ -224,9 +230,9 @@ export class ExeScriptExecutor { error = new GolemTimeoutError(`Activity ${activity.id} timeout.`); } - if (!isRunning()) { + if (abortSignal.aborted) { logger.debug("Activity is no longer running, will stop streaming batch execution results"); - error = new GolemAbortError(`Activity ${activity.id} has been interrupted.`); + error = new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason); } if (errors.length) { @@ -250,8 +256,11 @@ export class ExeScriptExecutor { } await sleep(500, true); } - - this.push(null); + if (abortSignal.aborted) { + this.destroy(new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason)); + } else { + this.push(null); + } source.unsubscribe(); }, }); diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index 2f18b8705..1ef950713 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -13,7 +13,7 @@ import { UploadFile, } from "../script"; import { NullStorageProvider, StorageProvider } from "../../shared/storage"; -import { defaultLogger, Logger, sleep, YagnaOptions } from "../../shared/utils"; +import { createAbortSignalFromTimeout, defaultLogger, Logger, sleep, YagnaOptions } from "../../shared/utils"; import { Batch } from "./batch"; import { NetworkNode } from "../../network"; import { RemoteProcess } from "./process"; @@ -39,6 +39,7 @@ export interface WorkOptions { activityReadySetupFunctions?: Worker[]; yagnaOptions?: YagnaOptions; execution?: ExecutionOptions; + signalOrTimeout?: number | AbortSignal; } export interface CommandOptions { @@ -67,6 +68,7 @@ export class WorkContext { private readonly networkNode?: NetworkNode; private executor: ExeScriptExecutor; + private readonly abortSignal: AbortSignal; constructor( public readonly activity: Activity, @@ -81,11 +83,17 @@ export class WorkContext { this.storageProvider = options?.storageProvider ?? new NullStorageProvider(); this.networkNode = options?.networkNode; - - this.executor = this.activityModule.createScriptExecutor(this.activity, this.options?.execution); + this.abortSignal = createAbortSignalFromTimeout(options?.signalOrTimeout); + this.executor = this.activityModule.createScriptExecutor(this.activity, { + ...this.options?.execution, + signalOrTimeout: this.abortSignal, + }); } private async fetchState(): Promise { + if (this.abortSignal.aborted) { + return ActivityStateEnum.Unknown; + } return this.activityModule .refreshActivity(this.activity) .then((activity) => activity.getState()) @@ -150,6 +158,10 @@ export class WorkContext { })(), ]) .catch((error) => { + if (this.abortSignal.aborted) { + this.logger.warn("Initializing of activity has been aborted", { reason: this.abortSignal.reason }); + return; + } if (error instanceof GolemWorkError) { throw error; } diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 3f0b93c99..b75ded28c 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -331,7 +331,7 @@ export class GolemNetwork { * * @param order */ - async oneOf(order: MarketOrderSpec): Promise { + async oneOf(order: MarketOrderSpec, signalOrTimeout?: number | AbortSignal): Promise { const proposalPool = new DraftOfferProposalPool({ logger: this.logger, validateProposal: order.market.proposalFilter, @@ -354,9 +354,13 @@ export class GolemNetwork { const proposalSubscription = proposalPool.readFrom(draftProposal$); - const agreement = await this.market.signAgreementFromPool(proposalPool, { - expirationSec: order.market.rentHours * 60 * 60, - }); + const agreement = await this.market.signAgreementFromPool( + proposalPool, + { + expirationSec: order.market.rentHours * 60 * 60, + }, + signalOrTimeout, + ); const networkNode = order.network ? await this.network.createNetworkNode(order.network, agreement.getProviderInfo().id) @@ -366,6 +370,7 @@ export class GolemNetwork { payment: order.payment, activity: order.activity, networkNode, + signalOrTimeout, }); // We managed to create the activity, no need to look for more agreement candidates diff --git a/src/lease-process/lease-process-pool.test.ts b/src/lease-process/lease-process-pool.test.ts index 34bc59aea..f96694b6d 100644 --- a/src/lease-process/lease-process-pool.test.ts +++ b/src/lease-process/lease-process-pool.test.ts @@ -57,13 +57,13 @@ describe("LeaseProcessPool", () => { await pool.ready(); expect(pool.getAvailableSize()).toBe(5); - verify(marketModule.signAgreementFromPool(_, _)).times(5); + verify(marketModule.signAgreementFromPool(_, _, _)).times(5); }); it("retries on error", async () => { when(leaseModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess); const fakeAgreement = {} as Agreement; - when(marketModule.signAgreementFromPool(_, _)) + when(marketModule.signAgreementFromPool(_, _, _)) .thenResolve(fakeAgreement) .thenReject(new Error("Failed to propose agreement")) .thenResolve(fakeAgreement) @@ -75,7 +75,7 @@ describe("LeaseProcessPool", () => { await pool.ready(); expect(pool.getAvailableSize()).toBe(3); - verify(marketModule.signAgreementFromPool(_, _)).times(5); + verify(marketModule.signAgreementFromPool(_, _, _)).times(5); }); it("stops retrying after abort signal is triggered", async () => { const pool = getLeasePool({ min: 3 }); diff --git a/src/lease-process/lease-process-pool.ts b/src/lease-process/lease-process-pool.ts index 299774cdf..3c5856491 100644 --- a/src/lease-process/lease-process-pool.ts +++ b/src/lease-process/lease-process-pool.ts @@ -100,16 +100,21 @@ export class LeaseProcessPool { })() || MAX_REPLICAS; } - private async createNewLeaseProcess() { + private async createNewLeaseProcess(signalOrTimeout?: number | AbortSignal) { this.logger.debug("Creating new lease process to add to pool"); try { - const agreement = await this.marketModule.signAgreementFromPool(this.proposalPool, this.agreementOptions); + const agreement = await this.marketModule.signAgreementFromPool( + this.proposalPool, + this.agreementOptions, + signalOrTimeout, + ); const networkNode = this.network ? await this.networkModule.createNetworkNode(this.network, agreement.getProviderInfo().id) : undefined; const leaseProcess = this.leaseModule.createLease(agreement, this.allocation, { networkNode, ...this.leaseProcessOptions, + signalOrTimeout, }); this.events.emit("created", agreement); return leaseProcess; @@ -176,7 +181,7 @@ export class LeaseProcessPool { /** * Borrow a lease process from the pool. If there is no valid lease process a new one will be created. */ - async acquire(): Promise { + async acquire(signalOrTimeout?: number | AbortSignal): Promise { if (this.isDraining) { throw new Error("The pool is in draining mode"); } @@ -185,7 +190,7 @@ export class LeaseProcessPool { if (!this.canCreateMoreLeaseProcesses()) { return this.enqueueAcquire(); } - leaseProcess = await this.createNewLeaseProcess(); + leaseProcess = await this.createNewLeaseProcess(signalOrTimeout); } this.borrowed.add(leaseProcess); this.events.emit("acquired", leaseProcess.agreement); @@ -349,8 +354,11 @@ export class LeaseProcessPool { * }); * ``` */ - public async withLease(callback: (lease: LeaseProcess) => Promise): Promise { - const lease = await this.acquire(); + public async withLease( + callback: (lease: LeaseProcess) => Promise, + signalOrTimeout?: number | AbortSignal, + ): Promise { + const lease = await this.acquire(signalOrTimeout); try { return await callback(lease); } finally { diff --git a/src/lease-process/lease-process.ts b/src/lease-process/lease-process.ts index 489862c04..fb6418da2 100644 --- a/src/lease-process/lease-process.ts +++ b/src/lease-process/lease-process.ts @@ -1,6 +1,6 @@ import { Agreement } from "../market/agreement/agreement"; import { AgreementPaymentProcess, PaymentProcessOptions } from "../payment/agreement_payment_process"; -import { Logger } from "../shared/utils"; +import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; import { waitForCondition } from "../shared/utils/wait"; import { ActivityModule, WorkContext } from "../activity"; import { StorageProvider } from "../shared/storage"; @@ -8,6 +8,7 @@ import { EventEmitter } from "eventemitter3"; import { NetworkNode } from "../network"; import { ExecutionOptions } from "../activity/exe-script-executor"; import { MarketModule } from "../market"; +import { GolemUserError } from "../shared/error/golem-error"; export interface LeaseProcessEvents { /** @@ -20,6 +21,7 @@ export interface LeaseProcessOptions { activity?: ExecutionOptions; payment?: Partial; networkNode?: NetworkNode; + signalOrTimeout?: number | AbortSignal; } /** @@ -31,6 +33,8 @@ export class LeaseProcess { public readonly networkNode?: NetworkNode; private currentWorkContext: WorkContext | null = null; + private abortController = new AbortController(); + private finalizeTask?: () => Promise; public constructor( public readonly agreement: Agreement, @@ -43,6 +47,13 @@ export class LeaseProcess { ) { this.networkNode = this.leaseOptions?.networkNode; + const abortSignal = createAbortSignalFromTimeout(leaseOptions?.signalOrTimeout); + abortSignal.addEventListener("abort", () => this.abortController.abort(abortSignal.reason)); + this.abortController.signal.addEventListener("abort", () => { + this.logger.warn("The lease process has been aborted.", { reason: this.abortController.signal.reason }); + this.finalize(); + }); + // TODO: Listen to agreement events to know when it goes down due to provider closing it! } @@ -51,36 +62,49 @@ export class LeaseProcess { * If the lease is already finalized, it will resolve immediately. */ async finalize() { - if (this.paymentProcess.isFinished()) { - return; - } - - try { - this.logger.debug("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); - if (this.currentWorkContext) { - await this.activityModule.destroyActivity(this.currentWorkContext.activity); - if ((await this.fetchAgreementState()) !== "Terminated") { - await this.marketModule.terminateAgreement(this.agreement); + // Prevent to call this method more than once + if (!this.finalizeTask) { + this.finalizeTask = async () => { + if (this.paymentProcess.isFinished()) { + return; } - } - await waitForCondition(() => this.paymentProcess.isFinished()); - this.logger.debug("Payment process for agreement finalized", { agreementId: this.agreement.id }); - } catch (error) { - this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); - throw error; - } finally { - this.events.emit("finalized"); + try { + this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); + if (this.currentWorkContext) { + await this.activityModule.destroyActivity(this.currentWorkContext.activity); + } + if ((await this.fetchAgreementState()) !== "Terminated") { + await this.marketModule.terminateAgreement(this.agreement); + } + await waitForCondition(() => this.paymentProcess.isFinished()); + this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); + } catch (error) { + this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); + throw error; + } finally { + this.events.emit("finalized"); + } + }; } + return this.finalizeTask(); } public hasActivity(): boolean { return this.currentWorkContext !== null; } + public abort(reason?: Error | string) { + this.abortController.abort(reason); + return this.abortController; + } + /** * Creates an activity on the Provider, and returns a work context that can be used to operate within the activity */ async getExeUnit(): Promise { + if (this.finalizeTask || this.abortController.signal.aborted) { + throw new GolemUserError("The lease process is not active. It may have been aborted or finalized"); + } if (this.currentWorkContext) { return this.currentWorkContext; } @@ -89,7 +113,8 @@ export class LeaseProcess { this.currentWorkContext = await this.activityModule.createWorkContext(activity, { storageProvider: this.storageProvider, networkNode: this.leaseOptions?.networkNode, - execution: this.leaseOptions?.activity, + execution: { ...this.leaseOptions?.activity }, + signalOrTimeout: this.abortController.signal, }); return this.currentWorkContext; diff --git a/src/market/draft-offer-proposal-pool.ts b/src/market/draft-offer-proposal-pool.ts index 5c9734429..e13e13cd9 100644 --- a/src/market/draft-offer-proposal-pool.ts +++ b/src/market/draft-offer-proposal-pool.ts @@ -2,8 +2,9 @@ import { OfferProposal, ProposalFilter } from "./proposal/offer-proposal"; import AsyncLock from "async-lock"; import { EventEmitter } from "eventemitter3"; import { GolemMarketError, MarketErrorCode } from "./error"; -import { defaultLogger, Logger, sleep } from "../shared/utils"; +import { createAbortSignalFromTimeout, defaultLogger, Logger, sleep } from "../shared/utils"; import { Observable, Subscription } from "rxjs"; +import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; export type ProposalSelector = (proposals: OfferProposal[]) => OfferProposal; @@ -27,13 +28,6 @@ export interface ProposalPoolOptions { */ minCount?: number; - /** - * Number of seconds to wait for an acquire call to finish before throwing an exception - * - * @default 30 - */ - acquireTimeoutSec?: number; - logger?: Logger; } @@ -65,9 +59,6 @@ export class DraftOfferProposalPool { /** {@link ProposalPoolOptions.minCount} */ private readonly minCount: number = 0; - /** {@link ProposalPoolOptions.acquireTimeoutSec} */ - private readonly acquireTimeoutSec: number = 30; - /** {@link ProposalPoolOptions.selectProposal} */ private readonly selectProposal: ProposalSelector = (proposals: OfferProposal[]) => proposals[0]; @@ -96,10 +87,6 @@ export class DraftOfferProposalPool { this.minCount = options.minCount; } - if (options?.acquireTimeoutSec && options.acquireTimeoutSec >= 0) { - this.acquireTimeoutSec = options?.acquireTimeoutSec; - } - this.logger = this.logger = options?.logger || defaultLogger("proposal-pool"); } @@ -118,46 +105,45 @@ export class DraftOfferProposalPool { } /** - * Attempts to obtain a single proposal from the pool - * - * This method will reject if no suitable proposal will be found within {@link DraftOfferProposalPool.acquireTimeoutSec} seconds. + * Attempts to obtain a single proposal from the poolonds. + * @param TODO */ - public acquire(): Promise { - return this.lock.acquire( - "proposal-pool", - async () => { - let proposal: OfferProposal | null = null; - - while (proposal === null) { - // Try to get one - proposal = this.available.size > 0 ? this.selectProposal([...this.available]) : null; - - if (proposal) { - // Validate - if (!this.validateProposal(proposal)) { - // Drop if not valid - this.removeFromAvailable(proposal); - // Keep searching - proposal = null; - } - } - // if not found or not valid wait a while for next try - if (!proposal) { - await sleep(1); + public acquire(signalOrTimeout?: number | AbortSignal): Promise { + const signal = createAbortSignalFromTimeout(signalOrTimeout); + return this.lock.acquire("proposal-pool", async () => { + let proposal: OfferProposal | null = null; + + while (proposal === null) { + if (signal.aborted) { + throw signal.reason.name === "TimeoutError" + ? new GolemTimeoutError("Could not provide any proposal in time") + : new GolemAbortError("The acquiring of proposals has been interrupted", signal.reason); + } + // Try to get one + proposal = this.available.size > 0 ? this.selectProposal([...this.available]) : null; + + if (proposal) { + // Validate + if (!this.validateProposal(proposal)) { + // Drop if not valid + this.removeFromAvailable(proposal); + // Keep searching + proposal = null; } } + // if not found or not valid wait a while for next try + if (!proposal) { + await sleep(1); + } + } - this.available.delete(proposal); - this.leased.add(proposal); + this.available.delete(proposal); + this.leased.add(proposal); - this.events.emit("acquired", proposal); + this.events.emit("acquired", proposal); - return proposal; - }, - { - maxOccupationTime: this.acquireTimeoutSec * 1000, - }, - ); + return proposal; + }); } /** diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 4715229f8..9352c5282 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -14,6 +14,7 @@ import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { Agreement, AgreementEvent, ProviderInfo } from "./agreement"; import { waitAndCall, waitForCondition } from "../shared/utils/wait"; import { MarketOrderSpec } from "../golem-network"; +import { GolemAbortError } from "../shared/error/golem-error"; const mockMarketApiAdapter = mock(MarketApiAdapter); const mockYagna = mock(YagnaApi); @@ -430,7 +431,7 @@ describe("Market module", () => { const badProposal1 = {} as OfferProposal; const goodProposal = {} as OfferProposal; const mockPool = mock(DraftOfferProposalPool); - when(mockPool.acquire()).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal); + when(mockPool.acquire(_)).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal); when(mockPool.remove(_)).thenResolve(); const goodAgreement = {} as Agreement; const marketSpy = spy(marketModule); @@ -440,7 +441,7 @@ describe("Market module", () => { const signedProposal = await marketModule.signAgreementFromPool(instance(mockPool)); - verify(mockPool.acquire()).thrice(); + verify(mockPool.acquire(_)).thrice(); verify(marketSpy.proposeAgreement(badProposal0, _)).once(); verify(mockPool.remove(badProposal0)).once(); verify(marketSpy.proposeAgreement(badProposal1, _)).once(); @@ -454,15 +455,17 @@ describe("Market module", () => { const error = new Error("Operation cancelled"); const proposal = {} as OfferProposal; const mockPool = mock(DraftOfferProposalPool); - when(mockPool.acquire()).thenCall(async () => { + when(mockPool.acquire(_)).thenCall(async () => { ac.abort(error); return proposal; }); const marketSpy = spy(marketModule); - await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, ac.signal)).rejects.toThrow(error); + await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, ac.signal)).rejects.toMatchError( + new GolemAbortError("The signing of the agreement has been interrupted", error), + ); - verify(mockPool.acquire()).once(); + verify(mockPool.acquire(_)).once(); verify(mockPool.release(proposal)).once(); verify(mockPool.remove(_)).never(); verify(marketSpy.proposeAgreement(_)).never(); @@ -471,7 +474,7 @@ describe("Market module", () => { const mockPool = mock(DraftOfferProposalPool); const signal = AbortSignal.abort(); await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, signal)).rejects.toThrow( - "This operation was aborted", + "The signing of the agreement has been interrupted", ); verify(mockPool.acquire()).never(); }); @@ -483,7 +486,7 @@ describe("Market module", () => { when(marketSpy.proposeAgreement(_)).thenReject(new Error("Failed to sign proposal")); await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, 50)).rejects.toThrow( - "The operation was aborted due to timeout", + "Could not sign any agreement in time", ); }); it("respects the timeout on draft proposal pool acquire and forwards the error", async () => { diff --git a/src/market/market.module.ts b/src/market/market.module.ts index dd7651c8c..4020d42b3 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -37,7 +37,7 @@ import { WorkloadDemandDirector } from "./demand/directors/workload-demand-direc import { WorkloadDemandDirectorConfigOptions } from "./demand/options"; import { BasicDemandDirectorConfig } from "./demand/directors/basic-demand-director-config"; import { PaymentDemandDirectorConfig } from "./demand/directors/payment-demand-director-config"; -import { GolemUserError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemTimeoutError, GolemUserError } from "../shared/error/golem-error"; import { MarketOrderSpec } from "../golem-network"; import { INetworkApi, NetworkModule } from "../network"; import { AgreementOptions } from "./agreement/agreement"; @@ -470,14 +470,27 @@ export class MarketModuleImpl implements MarketModule { ): Promise { const signal = createAbortSignalFromTimeout(signalOrTimeout); - const tryProposing = async (): Promise => { - signal.throwIfAborted(); - const proposal = await draftProposalPool.acquire(); - if (signal.aborted) { - await draftProposalPool.release(proposal); + const getProposal = async () => { + try { signal.throwIfAborted(); + const proposal = await draftProposalPool.acquire(signalOrTimeout); + if (signal.aborted) { + await draftProposalPool.release(proposal); + signal.throwIfAborted(); + } + return proposal; + } catch (error) { + if (signal.aborted) { + throw signal.reason.name === "TimeoutError" + ? new GolemTimeoutError("Could not sign any agreement in time") + : new GolemAbortError("The signing of the agreement has been interrupted", error); + } + throw error; } + }; + const tryProposing = async (): Promise => { + const proposal = await getProposal(); try { const agreement = await this.proposeAgreement(proposal, agreementOptions); // agreement is valid, proposal can be destroyed diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index f25264b06..5e6c4d116 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -1,5 +1,5 @@ import { Subscription } from "rxjs"; -import { Allocation, DraftOfferProposalPool, GolemNetwork } from "../../src"; +import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; describe("LeaseProcessPool", () => { const glm = new GolemNetwork(); @@ -158,4 +158,47 @@ describe("LeaseProcessPool", () => { await pool.drainAndClear(); await glm.network.removeNetwork(network); }); + + it("should abort lease process by abort call", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const leaseProcess = await pool.acquire(); + return new Promise(async (res) => { + leaseProcess.events.on("finalized", async () => res(true)); + leaseProcess.abort("test reason"); + }); + }); + + it("should abort lease process during execution by abort call", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const leaseProcess = await pool.acquire(); + const exe = await leaseProcess.getExeUnit(); + setTimeout(() => leaseProcess.abort(), 8_000); + await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( + new GolemAbortError("Processing of batch execution has been aborted"), + ); + await pool.drainAndClear(); + }); + + it("should abort lease during execution process by signal", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const ac = new AbortController(); + const leaseProcess = await pool.acquire(ac.signal); + const exe = await leaseProcess.getExeUnit(); + setTimeout(() => ac.abort(), 8_000); + await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( + new GolemAbortError("Processing of batch execution has been aborted"), + ); + await pool.drainAndClear(); + }); + + it("should abort lease during execution process by timeout", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const ac = new AbortController(); + const leaseProcess = await pool.acquire(8_000); + const exe = await leaseProcess.getExeUnit(); + await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( + new GolemAbortError("Processing of batch execution has been aborted"), + ); + await pool.drainAndClear(); + }); }); From ed29a6a0b16f758dbbf4d9ef3c393dc21d193a4e Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Mon, 17 Jun 2024 15:44:24 +0200 Subject: [PATCH 2/9] refactor: assigned cancel behaviour to finalize in lease process --- src/activity/activity.module.ts | 14 ++++- src/activity/exe-script-executor.test.ts | 12 ++--- src/activity/exe-script-executor.ts | 37 +++++++------ src/activity/work/work.ts | 3 +- src/golem-network/golem-network.ts | 1 - src/lease-process/lease-process-pool.ts | 3 +- src/lease-process/lease-process.ts | 68 +++++++++++------------- src/market/market.module.test.ts | 4 +- src/market/market.module.ts | 2 +- tests/e2e/leaseProcessPool.spec.ts | 43 ++++----------- 10 files changed, 83 insertions(+), 104 deletions(-) diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index a0202ff61..5ad923b9c 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -7,7 +7,7 @@ import { WorkContext, WorkOptions } from "./work"; import { ExeScriptExecutor, ExeScriptRequest, ExecutionOptions } from "./exe-script-executor"; import { Observable, catchError, tap } from "rxjs"; import { StreamingBatchEvent } from "./results"; -import { GolemAbortError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; export interface ActivityModule { events: EventEmitter; @@ -242,7 +242,17 @@ export class ActivityModuleImpl implements ActivityModule { } catch (error) { this.events.emit("errorInitializingActivity", activity, error); if (options?.signalOrTimeout instanceof AbortSignal && options.signalOrTimeout.aborted) { - throw new GolemAbortError("Initialization of the exe-unit has been aborted", options?.signalOrTimeout.reason); + const error = + options.signalOrTimeout.reason.name === "TimeoutError" + ? new GolemTimeoutError( + "Initializing of the exe-unit has been aborted due to a timeout", + options.signalOrTimeout.reason, + ) + : new GolemAbortError( + "Initializing of the exe-unit has been aborted by a signal", + options.signalOrTimeout.reason, + ); + throw error; } throw error; } diff --git a/src/activity/exe-script-executor.test.ts b/src/activity/exe-script-executor.test.ts index 0db976175..bcc0fda77 100644 --- a/src/activity/exe-script-executor.test.ts +++ b/src/activity/exe-script-executor.test.ts @@ -248,12 +248,10 @@ describe("ExeScriptExecutor", () => { const script = Script.create([command1, command2, command3, command4, command5, command6]); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), undefined, undefined); - ac.abort(new Error("test reson")); + ac.abort(); return new Promise((res) => { results.on("error", (error) => { - expect(error).toMatchError( - new GolemAbortError(`Processing of batch execution has been aborted`, new Error("test reson")), - ); + expect(error).toEqual(new GolemAbortError(`Processing of script execution has been aborted`)); return res(); }); results.on("data", () => null); @@ -282,12 +280,10 @@ describe("ExeScriptExecutor", () => { const script = Script.create([command1, command2, command3, command4]); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), true, undefined); - ac.abort(new Error("test reson")); + ac.abort(); return new Promise((res) => { results.on("error", (error) => { - expect(error).toMatchError( - new GolemAbortError(`Processing of batch execution has been aborted`, new Error("test reson")), - ); + expect(error).toEqual(new GolemAbortError(`Processing of script execution has been aborted`)); return res(); }); results.on("data", () => null); diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index ee4ce71a0..c9c998be8 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -113,11 +113,12 @@ export class ExeScriptExecutor { return new Readable({ objectMode: true, - async read() { - abortSignal.addEventListener("abort", () => - this.destroy(new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason)), - ); + const abortError = new GolemAbortError("Processing of script execution has been aborted", abortSignal.reason); + abortSignal.addEventListener("abort", () => { + logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); + this.destroy(abortError); + }); while (!isBatchFinished && !abortSignal.aborted) { logger.debug("Polling for batch script execution result"); @@ -132,9 +133,8 @@ export class ExeScriptExecutor { logger.debug(`Trying to poll for batch execution results from yagna. Attempt: ${attempt}`); try { if (abortSignal.aborted) { - logger.debug("Activity is no longer running, will stop polling for batch execution results"); return bail( - new GolemAbortError(`Activity ${activityId} has been interrupted.`, abortSignal.reason), + new GolemAbortError(`Processing of script execution has been aborted`, abortSignal.reason), ); } return await activityModule.getBatchResults( @@ -171,11 +171,9 @@ export class ExeScriptExecutor { } } catch (error) { if (abortSignal.aborted) { - const message = "Processing of batch execution has been aborted"; - logger.warn(message, { reason: abortSignal.reason }); - return this.destroy(new GolemAbortError(message, abortSignal.reason)); + return; } - logger.error(`Processing batch execution results failed`, error); + logger.error(`Processing script execution results failed`, error); return this.destroy( error instanceof GolemWorkError @@ -192,10 +190,10 @@ export class ExeScriptExecutor { } } if (abortSignal.aborted) { - this.destroy(new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason)); - } else { - this.push(null); + this.destroy(abortError); + return; } + this.push(null); }, }); } @@ -223,7 +221,12 @@ export class ExeScriptExecutor { return new Readable({ objectMode: true, async read() { - while (!isBatchFinished) { + const abortError = new GolemAbortError("Processing of script execution has been aborted", abortSignal.reason); + abortSignal.addEventListener("abort", () => { + logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); + this.destroy(abortError); + }); + while (!isBatchFinished && !abortSignal.aborted) { let error: Error | undefined; if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { logger.debug("Activity probably timed-out, will stop streaming batch execution results"); @@ -231,8 +234,7 @@ export class ExeScriptExecutor { } if (abortSignal.aborted) { - logger.debug("Activity is no longer running, will stop streaming batch execution results"); - error = new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason); + error = abortError; } if (errors.length) { @@ -257,7 +259,8 @@ export class ExeScriptExecutor { await sleep(500, true); } if (abortSignal.aborted) { - this.destroy(new GolemAbortError(`Processing of batch execution has been aborted`, abortSignal.reason)); + this.destroy(abortError); + return; } else { this.push(null); } diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index 1ef950713..a713ed0d9 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -159,7 +159,8 @@ export class WorkContext { ]) .catch((error) => { if (this.abortSignal.aborted) { - this.logger.warn("Initializing of activity has been aborted", { reason: this.abortSignal.reason }); + const message = "Initializing of activity has been aborted"; + this.logger.warn(message, { activityId: this.activity.id, reason: this.abortSignal.reason }); return; } if (error instanceof GolemWorkError) { diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index b75ded28c..58a26e7f4 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -370,7 +370,6 @@ export class GolemNetwork { payment: order.payment, activity: order.activity, networkNode, - signalOrTimeout, }); // We managed to create the activity, no need to look for more agreement candidates diff --git a/src/lease-process/lease-process-pool.ts b/src/lease-process/lease-process-pool.ts index 3c5856491..31a1a4a51 100644 --- a/src/lease-process/lease-process-pool.ts +++ b/src/lease-process/lease-process-pool.ts @@ -114,7 +114,6 @@ export class LeaseProcessPool { const leaseProcess = this.leaseModule.createLease(agreement, this.allocation, { networkNode, ...this.leaseProcessOptions, - signalOrTimeout, }); this.events.emit("created", agreement); return leaseProcess; @@ -230,7 +229,7 @@ export class LeaseProcessPool { async destroy(leaseProcess: LeaseProcess): Promise { try { this.borrowed.delete(leaseProcess); - this.logger.debug("Destroying lease process from the pool", { agreementId: leaseProcess.agreement.id }); + this.logger.info("Destroying lease process from the pool", { agreementId: leaseProcess.agreement.id }); await Promise.all([leaseProcess.finalize(), this.removeNetworkNode(leaseProcess)]); this.events.emit("destroyed", leaseProcess.agreement); } catch (error) { diff --git a/src/lease-process/lease-process.ts b/src/lease-process/lease-process.ts index fb6418da2..d5866c030 100644 --- a/src/lease-process/lease-process.ts +++ b/src/lease-process/lease-process.ts @@ -1,6 +1,6 @@ import { Agreement } from "../market/agreement/agreement"; import { AgreementPaymentProcess, PaymentProcessOptions } from "../payment/agreement_payment_process"; -import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; +import { Logger } from "../shared/utils"; import { waitForCondition } from "../shared/utils/wait"; import { ActivityModule, WorkContext } from "../activity"; import { StorageProvider } from "../shared/storage"; @@ -9,6 +9,7 @@ import { NetworkNode } from "../network"; import { ExecutionOptions } from "../activity/exe-script-executor"; import { MarketModule } from "../market"; import { GolemUserError } from "../shared/error/golem-error"; +import AsyncLock from "async-lock"; export interface LeaseProcessEvents { /** @@ -21,7 +22,6 @@ export interface LeaseProcessOptions { activity?: ExecutionOptions; payment?: Partial; networkNode?: NetworkNode; - signalOrTimeout?: number | AbortSignal; } /** @@ -34,7 +34,8 @@ export class LeaseProcess { private currentWorkContext: WorkContext | null = null; private abortController = new AbortController(); - private finalizeTask?: () => Promise; + private finalizeTask?: Promise; + private asyncLock = new AsyncLock(); public constructor( public readonly agreement: Agreement, @@ -47,13 +48,6 @@ export class LeaseProcess { ) { this.networkNode = this.leaseOptions?.networkNode; - const abortSignal = createAbortSignalFromTimeout(leaseOptions?.signalOrTimeout); - abortSignal.addEventListener("abort", () => this.abortController.abort(abortSignal.reason)); - this.abortController.signal.addEventListener("abort", () => { - this.logger.warn("The lease process has been aborted.", { reason: this.abortController.signal.reason }); - this.finalize(); - }); - // TODO: Listen to agreement events to know when it goes down due to provider closing it! } @@ -62,42 +56,40 @@ export class LeaseProcess { * If the lease is already finalized, it will resolve immediately. */ async finalize() { - // Prevent to call this method more than once - if (!this.finalizeTask) { - this.finalizeTask = async () => { - if (this.paymentProcess.isFinished()) { - return; - } - try { - this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); - if (this.currentWorkContext) { - await this.activityModule.destroyActivity(this.currentWorkContext.activity); + return this.asyncLock.acquire("finalize", () => { + // Prevent this task from being performed more than once + if (!this.finalizeTask) { + this.finalizeTask = (async () => { + this.abortController.abort("Finalising the leasing rocess"); + if (this.paymentProcess.isFinished()) { + return; } - if ((await this.fetchAgreementState()) !== "Terminated") { - await this.marketModule.terminateAgreement(this.agreement); + try { + this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); + if (this.currentWorkContext) { + await this.activityModule.destroyActivity(this.currentWorkContext.activity); + } + if ((await this.fetchAgreementState()) !== "Terminated") { + await this.marketModule.terminateAgreement(this.agreement); + } + await waitForCondition(() => this.paymentProcess.isFinished()); + this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); + } catch (error) { + this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); + throw error; + } finally { + this.events.emit("finalized"); } - await waitForCondition(() => this.paymentProcess.isFinished()); - this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); - } catch (error) { - this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); - throw error; - } finally { - this.events.emit("finalized"); - } - }; - } - return this.finalizeTask(); + })(); + } + return this.finalizeTask; + }); } public hasActivity(): boolean { return this.currentWorkContext !== null; } - public abort(reason?: Error | string) { - this.abortController.abort(reason); - return this.abortController; - } - /** * Creates an activity on the Provider, and returns a work context that can be used to operate within the activity */ diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 9352c5282..875cba6a6 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -462,7 +462,7 @@ describe("Market module", () => { const marketSpy = spy(marketModule); await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, ac.signal)).rejects.toMatchError( - new GolemAbortError("The signing of the agreement has been interrupted", error), + new GolemAbortError("The signing of the agreement has been aborted", error), ); verify(mockPool.acquire(_)).once(); @@ -474,7 +474,7 @@ describe("Market module", () => { const mockPool = mock(DraftOfferProposalPool); const signal = AbortSignal.abort(); await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, signal)).rejects.toThrow( - "The signing of the agreement has been interrupted", + "The signing of the agreement has been aborted", ); verify(mockPool.acquire()).never(); }); diff --git a/src/market/market.module.ts b/src/market/market.module.ts index 4020d42b3..5820f941e 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -483,7 +483,7 @@ export class MarketModuleImpl implements MarketModule { if (signal.aborted) { throw signal.reason.name === "TimeoutError" ? new GolemTimeoutError("Could not sign any agreement in time") - : new GolemAbortError("The signing of the agreement has been interrupted", error); + : new GolemAbortError("The signing of the agreement has been aborted", error); } throw error; } diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index 5e6c4d116..53ed91f2c 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -1,5 +1,6 @@ import { Subscription } from "rxjs"; import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; +import exp from "node:constants"; describe("LeaseProcessPool", () => { const glm = new GolemNetwork(); @@ -159,46 +160,24 @@ describe("LeaseProcessPool", () => { await glm.network.removeNetwork(network); }); - it("should abort lease process by abort call", async () => { + it("should abort acquiring lease process by signal", async () => { const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); - const leaseProcess = await pool.acquire(); - return new Promise(async (res) => { - leaseProcess.events.on("finalized", async () => res(true)); - leaseProcess.abort("test reason"); - }); + const abortControler = new AbortController(); + abortControler.abort(); + await expect(pool.acquire(abortControler.signal)).rejects.toThrow("The signing of the agreement has been aborted"); }); - it("should abort lease process during execution by abort call", async () => { + it("should finalize lease process during execution", async () => { const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); const leaseProcess = await pool.acquire(); const exe = await leaseProcess.getExeUnit(); - setTimeout(() => leaseProcess.abort(), 8_000); - await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( - new GolemAbortError("Processing of batch execution has been aborted"), - ); - await pool.drainAndClear(); - }); - - it("should abort lease during execution process by signal", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); - const ac = new AbortController(); - const leaseProcess = await pool.acquire(ac.signal); - const exe = await leaseProcess.getExeUnit(); - setTimeout(() => ac.abort(), 8_000); + setTimeout(() => leaseProcess.finalize(), 8_000); await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( - new GolemAbortError("Processing of batch execution has been aborted"), - ); - await pool.drainAndClear(); - }); - - it("should abort lease during execution process by timeout", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); - const ac = new AbortController(); - const leaseProcess = await pool.acquire(8_000); - const exe = await leaseProcess.getExeUnit(); - await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( - new GolemAbortError("Processing of batch execution has been aborted"), + new GolemAbortError("Processing of script execution has been aborted"), ); await pool.drainAndClear(); + return new Promise(async (res) => { + leaseProcess.events.on("finalized", async () => res(true)); + }); }); }); From 98c0e64c64238895cdc3752724d7058ca52848d1 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Mon, 17 Jun 2024 20:44:12 +0200 Subject: [PATCH 3/9] test: added e2e tests verifying aborting --- src/activity/exe-script-executor.ts | 5 ++++- tests/e2e/leaseProcessPool.spec.ts | 26 ++++++++++++++++++++------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index c9c998be8..bc013577b 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -79,8 +79,11 @@ export class ExeScriptExecutor { reason: message, }); + if (this.abortSignal.aborted) { + throw new GolemAbortError("Executions of script has been aborted", this.abortSignal.reason); + } throw new GolemWorkError( - `Unable to execute script ${message}`, + `Unable to execute script. ${message}`, WorkErrorCode.ScriptExecutionFailed, this.activity.agreement, this.activity, diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index 53ed91f2c..fd6fb776c 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -167,17 +167,31 @@ describe("LeaseProcessPool", () => { await expect(pool.acquire(abortControler.signal)).rejects.toThrow("The signing of the agreement has been aborted"); }); - it("should finalize lease process during execution", async () => { + it("should abort acquiring lease process by timeout", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + await expect(pool.acquire(1_000)).rejects.toThrow("Could not sign any agreement in time"); + }); + + it("should finalize the lease process during execution", async () => { const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); const leaseProcess = await pool.acquire(); const exe = await leaseProcess.getExeUnit(); - setTimeout(() => leaseProcess.finalize(), 8_000); - await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( - new GolemAbortError("Processing of script execution has been aborted"), - ); - await pool.drainAndClear(); return new Promise(async (res) => { leaseProcess.events.on("finalized", async () => res(true)); + setTimeout(() => leaseProcess.finalize(), 8_000); + await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( + new GolemAbortError("Processing of script execution has been aborted"), + ); }); }); + + it("should throw error while executing script on a finalized lease process", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const leaseProcess = await pool.acquire(); + const exe = await leaseProcess.getExeUnit(); + await leaseProcess.finalize(); + await expect(exe.run("echo Hello World")).rejects.toThrow( + new GolemAbortError("Executions of script has been aborted"), + ); + }); }); From b6286c492cbde9c2b0487634a46938fd2222c5d4 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Mon, 17 Jun 2024 21:51:07 +0200 Subject: [PATCH 4/9] feat: added signalOrTimeout as CommandOptions in ExeUnit api --- src/activity/config.ts | 6 -- src/activity/exe-script-executor.test.ts | 20 +++--- src/activity/exe-script-executor.ts | 69 +++++++------------- src/activity/work/work.ts | 12 +++- src/market/draft-offer-proposal-pool.test.ts | 15 +++++ src/market/draft-offer-proposal-pool.ts | 4 +- tests/e2e/leaseProcessPool.spec.ts | 1 - tests/unit/work.test.ts | 21 +++--- 8 files changed, 69 insertions(+), 79 deletions(-) diff --git a/src/activity/config.ts b/src/activity/config.ts index 13bc9f9f6..c313c0c3d 100644 --- a/src/activity/config.ts +++ b/src/activity/config.ts @@ -1,8 +1,6 @@ import { ExecutionOptions } from "./exe-script-executor"; const DEFAULTS = { - activityRequestTimeout: 10000, - activityExecuteTimeout: 1000 * 60 * 5, // 5 min, activityExeBatchResultPollIntervalSeconds: 5, activityExeBatchResultMaxRetries: 20, }; @@ -11,14 +9,10 @@ const DEFAULTS = { * @internal */ export class ExecutionConfig { - public readonly activityRequestTimeout: number; - public readonly activityExecuteTimeout: number; public readonly activityExeBatchResultPollIntervalSeconds: number; public readonly activityExeBatchResultMaxRetries: number; constructor(options?: ExecutionOptions) { - this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout; - this.activityExecuteTimeout = options?.activityExecuteTimeout || DEFAULTS.activityExecuteTimeout; this.activityExeBatchResultMaxRetries = options?.activityExeBatchResultMaxRetries || DEFAULTS.activityExeBatchResultMaxRetries; this.activityExeBatchResultPollIntervalSeconds = diff --git a/src/activity/exe-script-executor.test.ts b/src/activity/exe-script-executor.test.ts index bcc0fda77..491159733 100644 --- a/src/activity/exe-script-executor.test.ts +++ b/src/activity/exe-script-executor.test.ts @@ -4,7 +4,7 @@ import { Capture, Deploy, DownloadFile, Run, Script, Start, Terminate, UploadFil import { buildExeScriptSuccessResult } from "../../tests/unit/helpers"; import { GolemWorkError, WorkErrorCode } from "./work"; import { Logger, sleep } from "../shared/utils"; -import { GolemAbortError, GolemError, GolemTimeoutError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemError } from "../shared/error/golem-error"; import { ExeScriptExecutor } from "./exe-script-executor"; import { StorageProvider } from "../shared/storage"; import { from, of, throwError } from "rxjs"; @@ -251,7 +251,7 @@ describe("ExeScriptExecutor", () => { ac.abort(); return new Promise((res) => { results.on("error", (error) => { - expect(error).toEqual(new GolemAbortError(`Processing of script execution has been aborted`)); + expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); return res(); }); results.on("data", () => null); @@ -283,7 +283,7 @@ describe("ExeScriptExecutor", () => { ac.abort(); return new Promise((res) => { results.on("error", (error) => { - expect(error).toEqual(new GolemAbortError(`Processing of script execution has been aborted`)); + expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); return res(); }); results.on("data", () => null); @@ -389,7 +389,7 @@ describe("ExeScriptExecutor", () => { .thenReject(error) .thenResolve([testResult]); - const results = await executor.execute(script.getExeScriptRequest(), false, 1_000, 10); + const results = await executor.execute(script.getExeScriptRequest(), false, undefined, 10); for await (const result of results) { expect(result).toEqual(testResult); @@ -449,8 +449,8 @@ describe("ExeScriptExecutor", () => { await sleep(10, true); return new Promise((res) => { results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemTimeoutError); - expect(error.toString()).toMatch(/Error: Activity .* timeout/); + expect(error).toBeInstanceOf(GolemAbortError); + expect(error.toString()).toEqual("Error: Execution of script has been aborted"); return res(); }); // results.on("end", () => rej()); @@ -458,14 +458,14 @@ describe("ExeScriptExecutor", () => { }); }); - it("should handle timeout error while streaming batch", async () => { + it("should handle abort error while streaming batch", async () => { when(mockActivityModule.observeStreamingBatchEvents(anything(), anything())).thenReturn(of()); const executor = new ExeScriptExecutor( instance(mockActivity), instance(mockActivityModule), instance(mockLogger), { - activityExecuteTimeout: 1, + signalOrTimeout: 1, }, ); const command1 = new Deploy(); @@ -481,8 +481,8 @@ describe("ExeScriptExecutor", () => { const results = await executor.execute(script.getExeScriptRequest(), true, 800); return new Promise((res, rej) => { results.on("error", (error: GolemError) => { - expect(error).toBeInstanceOf(GolemTimeoutError); - expect(error.toString()).toMatch(/Error: Activity .* timeout/); + expect(error).toBeInstanceOf(GolemAbortError); + expect(error.toString()).toEqual("Error: Execution of script has been aborted"); return res(); }); results.on("end", () => rej()); diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index bc013577b..5acc7c1ec 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -3,7 +3,7 @@ import { ExecutionConfig } from "./config"; import { Readable } from "stream"; import { GolemWorkError, WorkErrorCode } from "./work"; import { withTimeout } from "../shared/utils/timeout"; -import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; +import { GolemAbortError } from "../shared/error/golem-error"; import retry from "async-retry"; import { Result, StreamingBatchEvent } from "./results"; import sleep from "../shared/utils/sleep"; @@ -16,14 +16,11 @@ export interface ExeScriptRequest { } export interface ExecutionOptions { - /** timeout for sending and creating batch */ - activityRequestTimeout?: number; - /** timeout for executing batch */ - activityExecuteTimeout?: number; /** interval for fetching batch results while polling */ activityExeBatchResultPollIntervalSeconds?: number; /** maximum number of retries retrieving results when an error occurs, default: 10 */ activityExeBatchResultMaxRetries?: number; + /** The timeout in milliseconds or an AbortSignal that will be used to cancel the execution */ signalOrTimeout?: number | AbortSignal; } @@ -48,30 +45,34 @@ export class ExeScriptExecutor { * * @param script - exe script request * @param stream - define type of getting results from execution (polling or streaming) - * @param timeout - execution timeout + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the execution * @param maxRetries - maximum number of retries retrieving results when an error occurs, default: 10 */ public async execute( script: ExeScriptRequest, stream?: boolean, - timeout?: number, + signalOrTimeout?: number | AbortSignal, maxRetries?: number, ): Promise { let batchId: string, batchSize: number; - let startTime = new Date(); + const abortController = new AbortController(); + this.abortSignal.addEventListener("abort", () => abortController.abort(this.abortSignal.reason)); + if (signalOrTimeout) { + const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); + abortSignal.addEventListener("abort", () => abortController.abort(abortSignal.reason)); + } try { this.abortSignal.throwIfAborted(); batchId = await this.send(script); - startTime = new Date(); batchSize = JSON.parse(script.text).length; this.abortSignal.throwIfAborted(); this.logger.debug(`Script sent.`, { batchId }); return stream - ? this.streamingBatch(batchId, batchSize, startTime, timeout) - : this.pollingBatch(batchId, startTime, timeout, maxRetries); + ? this.streamingBatch(batchId, batchSize, abortController.signal) + : this.pollingBatch(batchId, abortController.signal, maxRetries); } catch (error) { const message = getMessageFromApiError(error); @@ -94,15 +95,10 @@ export class ExeScriptExecutor { } protected async send(script: ExeScriptRequest): Promise { - return withTimeout(this.activityModule.executeScript(this.activity, script), this.options.activityRequestTimeout); + return withTimeout(this.activityModule.executeScript(this.activity, script), 10_000); } - private async pollingBatch( - batchId: string, - startTime: Date, - timeout?: number, - maxRetries?: number, - ): Promise { + private async pollingBatch(batchId: string, abortSignal: AbortSignal, maxRetries?: number): Promise { this.logger.debug("Starting to poll for batch results"); let isBatchFinished = false; @@ -110,14 +106,13 @@ export class ExeScriptExecutor { const { id: activityId, agreement } = this.activity; - const { activityExecuteTimeout, activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = - this.options; - const { logger, activity, activityModule, abortSignal } = this; + const { activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = this.options; + const { logger, activity, activityModule } = this; return new Readable({ objectMode: true, async read() { - const abortError = new GolemAbortError("Processing of script execution has been aborted", abortSignal.reason); + const abortError = new GolemAbortError("Execution of script has been aborted", abortSignal.reason); abortSignal.addEventListener("abort", () => { logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); this.destroy(abortError); @@ -125,20 +120,13 @@ export class ExeScriptExecutor { while (!isBatchFinished && !abortSignal.aborted) { logger.debug("Polling for batch script execution result"); - if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { - logger.debug("Activity probably timed-out, will stop polling for batch execution results"); - return this.destroy(new GolemTimeoutError(`Activity ${activityId} timeout.`)); - } - try { const results = await retry( async (bail, attempt) => { logger.debug(`Trying to poll for batch execution results from yagna. Attempt: ${attempt}`); try { if (abortSignal.aborted) { - return bail( - new GolemAbortError(`Processing of script execution has been aborted`, abortSignal.reason), - ); + return bail(abortError); } return await activityModule.getBatchResults( activity, @@ -174,7 +162,7 @@ export class ExeScriptExecutor { } } catch (error) { if (abortSignal.aborted) { - return; + return this.destroy(abortError); } logger.error(`Processing script execution results failed`, error); @@ -193,20 +181,14 @@ export class ExeScriptExecutor { } } if (abortSignal.aborted) { - this.destroy(abortError); - return; + return this.destroy(abortError); } this.push(null); }, }); } - private async streamingBatch( - batchId: string, - batchSize: number, - startTime: Date, - timeout?: number, - ): Promise { + private async streamingBatch(batchId: string, batchSize: number, abortSignal: AbortSignal): Promise { const errors: object[] = []; const results: Result[] = []; @@ -217,24 +199,19 @@ export class ExeScriptExecutor { }); let isBatchFinished = false; - const activityExecuteTimeout = this.options.activityExecuteTimeout; - const { logger, activity, abortSignal } = this; + const { logger, activity } = this; return new Readable({ objectMode: true, async read() { - const abortError = new GolemAbortError("Processing of script execution has been aborted", abortSignal.reason); + const abortError = new GolemAbortError("Execution of script has been aborted", abortSignal.reason); abortSignal.addEventListener("abort", () => { logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); this.destroy(abortError); }); while (!isBatchFinished && !abortSignal.aborted) { let error: Error | undefined; - if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { - logger.debug("Activity probably timed-out, will stop streaming batch execution results"); - error = new GolemTimeoutError(`Activity ${activity.id} timeout.`); - } if (abortSignal.aborted) { error = abortError; diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index a713ed0d9..d9b51b198 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -43,7 +43,8 @@ export interface WorkOptions { } export interface CommandOptions { - timeout?: number; + signalOrTimeout?: number | AbortSignal; + maxRetries?: number; env?: object; capture?: Capture; } @@ -267,7 +268,7 @@ export class WorkContext { // In this case, the script consists only of one run command, // so we skip the execution of script.before and script.after const streamOfActivityResults = await this.executor - .execute(script.getExeScriptRequest(), true, options?.timeout) + .execute(script.getExeScriptRequest(), true, options?.signalOrTimeout, options?.maxRetries) .catch((e) => { throw new GolemWorkError( `Script execution failed for command: ${JSON.stringify(run.toJson())}. ${ @@ -414,7 +415,12 @@ export class WorkContext { await sleep(100, true); // Send script. - const results = await this.executor.execute(script.getExeScriptRequest(), false, options?.timeout); + const results = await this.executor.execute( + script.getExeScriptRequest(), + false, + options?.signalOrTimeout, + options?.maxRetries, + ); // Process result. let allResults: Result[] = []; diff --git a/src/market/draft-offer-proposal-pool.test.ts b/src/market/draft-offer-proposal-pool.test.ts index 8005e82ce..e4dc54b26 100644 --- a/src/market/draft-offer-proposal-pool.test.ts +++ b/src/market/draft-offer-proposal-pool.test.ts @@ -1,6 +1,7 @@ import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { instance, mock, when } from "@johanblumenberg/ts-mockito"; import { OfferProposal } from "./index"; +import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; describe("Draft Offer Proposal Pool", () => { // GIVEN @@ -107,6 +108,20 @@ describe("Draft Offer Proposal Pool", () => { expect(a).toBe(proposal2); }); }); + describe("Negative cases", () => { + it("should abort the acquiring proposal by timeout", async () => { + const pool = new DraftOfferProposalPool(); + await expect(pool.acquire(1)).rejects.toThrow(new GolemTimeoutError("Could not provide any proposal in time")); + }); + it("should abort the acquiring proposal by signal", async () => { + const pool = new DraftOfferProposalPool(); + const ac = new AbortController(); + ac.abort(); + await expect(pool.acquire(ac.signal)).rejects.toThrow( + new GolemAbortError("The acquiring of proposals has been aborted"), + ); + }); + }); }); describe("Is ready", () => { diff --git a/src/market/draft-offer-proposal-pool.ts b/src/market/draft-offer-proposal-pool.ts index e13e13cd9..db330c28c 100644 --- a/src/market/draft-offer-proposal-pool.ts +++ b/src/market/draft-offer-proposal-pool.ts @@ -106,7 +106,7 @@ export class DraftOfferProposalPool { /** * Attempts to obtain a single proposal from the poolonds. - * @param TODO + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the acquiring */ public acquire(signalOrTimeout?: number | AbortSignal): Promise { const signal = createAbortSignalFromTimeout(signalOrTimeout); @@ -117,7 +117,7 @@ export class DraftOfferProposalPool { if (signal.aborted) { throw signal.reason.name === "TimeoutError" ? new GolemTimeoutError("Could not provide any proposal in time") - : new GolemAbortError("The acquiring of proposals has been interrupted", signal.reason); + : new GolemAbortError("The acquiring of proposals has been aborted", signal.reason); } // Try to get one proposal = this.available.size > 0 ? this.selectProposal([...this.available]) : null; diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index fd6fb776c..3ba985738 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -1,6 +1,5 @@ import { Subscription } from "rxjs"; import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; -import exp from "node:constants"; describe("LeaseProcessPool", () => { const glm = new GolemNetwork(); diff --git a/tests/unit/work.test.ts b/tests/unit/work.test.ts index 85689d8ba..3ce1a1b61 100644 --- a/tests/unit/work.test.ts +++ b/tests/unit/work.test.ts @@ -14,7 +14,6 @@ import { } from "../../src"; import { _, anyOfClass, anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import { buildExecutorResults, buildExeScriptErrorResult, buildExeScriptSuccessResult } from "./helpers"; -import { IPv4 } from "ip-num"; import { StorageProviderDataCallback } from "../../src/shared/storage/provider"; import { ActivityApi } from "ya-ts-client"; import { ExeScriptExecutor } from "../../src/activity/exe-script-executor"; @@ -55,7 +54,7 @@ describe("Work Context", () => { describe("Executing", () => { it("should execute run command with a single parameter", async () => { - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -75,7 +74,7 @@ describe("Work Context", () => { }); it("should execute run command with multiple parameters", async () => { - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -99,7 +98,7 @@ describe("Work Context", () => { it("should execute upload file command", async () => { const worker = async (ctx: WorkContext) => ctx.uploadFile("./file.txt", "/golem/file.txt"); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -123,7 +122,7 @@ describe("Work Context", () => { it("should execute upload json command", async () => { const worker = async (ctx: WorkContext) => ctx.uploadJson({ test: true }, "/golem/file.txt"); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -150,7 +149,7 @@ describe("Work Context", () => { it("should execute download file command", async () => { const worker = async (ctx: WorkContext) => ctx.downloadFile("/golem/file.txt", "./file.txt"); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -178,7 +177,7 @@ describe("Work Context", () => { const jsonStr = JSON.stringify(json); const encoded = new TextEncoder().encode(jsonStr); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -211,7 +210,7 @@ describe("Work Context", () => { when(mockStorageProvider.receiveData(anything())).thenResolve(data.toString()); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -248,7 +247,7 @@ describe("Work Context", () => { describe("Exec and stream", () => { it("should execute runAndStream command", async () => { const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -269,7 +268,7 @@ describe("Work Context", () => { it("should execute transfer command", async () => { const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -425,7 +424,7 @@ describe("Work Context", () => { const eventDate = new Date().toISOString(); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, From ec0b4815a4a7e9653d1f3261bcf237d591fa830e Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Tue, 18 Jun 2024 09:15:59 +0200 Subject: [PATCH 5/9] refactor: added typedoc comments and small fixes --- src/activity/exe-script-executor.ts | 6 +++--- src/activity/work/work.ts | 6 +++--- src/golem-network/golem-network.ts | 1 + src/lease-process/lease-process-pool.ts | 5 ++++- src/lease-process/lease-process.ts | 14 +++++++------- src/market/market.module.ts | 2 +- 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index 5acc7c1ec..06c8b7e4d 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -63,11 +63,11 @@ export class ExeScriptExecutor { } try { - this.abortSignal.throwIfAborted(); + abortController.signal.throwIfAborted(); batchId = await this.send(script); batchSize = JSON.parse(script.text).length; - this.abortSignal.throwIfAborted(); + abortController.signal.throwIfAborted(); this.logger.debug(`Script sent.`, { batchId }); return stream @@ -80,7 +80,7 @@ export class ExeScriptExecutor { reason: message, }); - if (this.abortSignal.aborted) { + if (abortController.signal.aborted) { throw new GolemAbortError("Executions of script has been aborted", this.abortSignal.reason); } throw new GolemWorkError( diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index d9b51b198..ffb2dc3db 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -18,7 +18,7 @@ import { Batch } from "./batch"; import { NetworkNode } from "../../network"; import { RemoteProcess } from "./process"; import { GolemWorkError, WorkErrorCode } from "./error"; -import { GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-error"; +import { GolemAbortError, GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-error"; import { Agreement, ProviderInfo } from "../../market/agreement"; import { TcpProxy } from "../../network/tcpProxy"; import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; @@ -93,7 +93,7 @@ export class WorkContext { private async fetchState(): Promise { if (this.abortSignal.aborted) { - return ActivityStateEnum.Unknown; + throw new GolemAbortError("ExeUnit has been aborted"); } return this.activityModule .refreshActivity(this.activity) @@ -162,7 +162,7 @@ export class WorkContext { if (this.abortSignal.aborted) { const message = "Initializing of activity has been aborted"; this.logger.warn(message, { activityId: this.activity.id, reason: this.abortSignal.reason }); - return; + throw new GolemAbortError(message, this.abortSignal.reason); } if (error instanceof GolemWorkError) { throw error; diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 58a26e7f4..c5e35e3e6 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -330,6 +330,7 @@ export class GolemNetwork { * ``` * * @param order + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the lease request */ async oneOf(order: MarketOrderSpec, signalOrTimeout?: number | AbortSignal): Promise { const proposalPool = new DraftOfferProposalPool({ diff --git a/src/lease-process/lease-process-pool.ts b/src/lease-process/lease-process-pool.ts index 31a1a4a51..b65542cd6 100644 --- a/src/lease-process/lease-process-pool.ts +++ b/src/lease-process/lease-process-pool.ts @@ -179,6 +179,7 @@ export class LeaseProcessPool { /** * Borrow a lease process from the pool. If there is no valid lease process a new one will be created. + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the lease request */ async acquire(signalOrTimeout?: number | AbortSignal): Promise { if (this.isDraining) { @@ -229,7 +230,7 @@ export class LeaseProcessPool { async destroy(leaseProcess: LeaseProcess): Promise { try { this.borrowed.delete(leaseProcess); - this.logger.info("Destroying lease process from the pool", { agreementId: leaseProcess.agreement.id }); + this.logger.debug("Destroying lease process from the pool", { agreementId: leaseProcess.agreement.id }); await Promise.all([leaseProcess.finalize(), this.removeNetworkNode(leaseProcess)]); this.events.emit("destroyed", leaseProcess.agreement); } catch (error) { @@ -352,6 +353,8 @@ export class LeaseProcessPool { * // even if an error is thrown in the callback * }); * ``` + * @param callback - a function that takes a `lease` object as its argument. The lease is automatically released after the callback is executed, regardless of whether it completes successfully or throws an error. + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the lease request */ public async withLease( callback: (lease: LeaseProcess) => Promise, diff --git a/src/lease-process/lease-process.ts b/src/lease-process/lease-process.ts index d5866c030..0ee914fe8 100644 --- a/src/lease-process/lease-process.ts +++ b/src/lease-process/lease-process.ts @@ -34,7 +34,7 @@ export class LeaseProcess { private currentWorkContext: WorkContext | null = null; private abortController = new AbortController(); - private finalizeTask?: Promise; + private finalizePromise?: Promise; private asyncLock = new AsyncLock(); public constructor( @@ -56,11 +56,11 @@ export class LeaseProcess { * If the lease is already finalized, it will resolve immediately. */ async finalize() { + // Prevent this task from being performed more than once return this.asyncLock.acquire("finalize", () => { - // Prevent this task from being performed more than once - if (!this.finalizeTask) { - this.finalizeTask = (async () => { - this.abortController.abort("Finalising the leasing rocess"); + if (!this.finalizePromise) { + this.finalizePromise = (async () => { + this.abortController.abort("The lease process is finalizing"); if (this.paymentProcess.isFinished()) { return; } @@ -82,7 +82,7 @@ export class LeaseProcess { } })(); } - return this.finalizeTask; + return this.finalizePromise; }); } @@ -94,7 +94,7 @@ export class LeaseProcess { * Creates an activity on the Provider, and returns a work context that can be used to operate within the activity */ async getExeUnit(): Promise { - if (this.finalizeTask || this.abortController.signal.aborted) { + if (this.finalizePromise || this.abortController.signal.aborted) { throw new GolemUserError("The lease process is not active. It may have been aborted or finalized"); } if (this.currentWorkContext) { diff --git a/src/market/market.module.ts b/src/market/market.module.ts index 5820f941e..cdedf763e 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -473,7 +473,7 @@ export class MarketModuleImpl implements MarketModule { const getProposal = async () => { try { signal.throwIfAborted(); - const proposal = await draftProposalPool.acquire(signalOrTimeout); + const proposal = await draftProposalPool.acquire(signal); if (signal.aborted) { await draftProposalPool.release(proposal); signal.throwIfAborted(); From 584fff68c75e043a567fe434910e7d315fc75eac Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Tue, 18 Jun 2024 09:51:53 +0200 Subject: [PATCH 6/9] chore: added comment --- src/activity/exe-script-executor.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index 06c8b7e4d..7d7f4093e 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -56,6 +56,7 @@ export class ExeScriptExecutor { ): Promise { let batchId: string, batchSize: number; const abortController = new AbortController(); + // abort execution in case of cancellation by global signal or by local signal (from parameter) this.abortSignal.addEventListener("abort", () => abortController.abort(this.abortSignal.reason)); if (signalOrTimeout) { const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); From 9d81b8b3d64906c2ec97c7a8fb9947a904541384 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Tue, 18 Jun 2024 10:13:10 +0200 Subject: [PATCH 7/9] test: fixed e2e test --- tests/e2e/leaseProcessPool.spec.ts | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index 3ba985738..1357f1c4a 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -179,18 +179,8 @@ describe("LeaseProcessPool", () => { leaseProcess.events.on("finalized", async () => res(true)); setTimeout(() => leaseProcess.finalize(), 8_000); await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( - new GolemAbortError("Processing of script execution has been aborted"), + new GolemAbortError("Execution of script has been aborted"), ); }); }); - - it("should throw error while executing script on a finalized lease process", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); - const leaseProcess = await pool.acquire(); - const exe = await leaseProcess.getExeUnit(); - await leaseProcess.finalize(); - await expect(exe.run("echo Hello World")).rejects.toThrow( - new GolemAbortError("Executions of script has been aborted"), - ); - }); }); From 95d5279a07ee061699cb0558393cc4be7ec25173 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Wed, 19 Jun 2024 10:55:21 +0200 Subject: [PATCH 8/9] refactor: removed async-loc from finalize lease pool --- src/activity/activity.module.ts | 5 +-- src/lease-process/lease-process.ts | 50 ++++++++++++------------- src/market/draft-offer-proposal-pool.ts | 2 +- tests/e2e/leaseProcessPool.spec.ts | 1 + 4 files changed, 26 insertions(+), 32 deletions(-) diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index 5ad923b9c..b4cf8a32b 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -248,10 +248,7 @@ export class ActivityModuleImpl implements ActivityModule { "Initializing of the exe-unit has been aborted due to a timeout", options.signalOrTimeout.reason, ) - : new GolemAbortError( - "Initializing of the exe-unit has been aborted by a signal", - options.signalOrTimeout.reason, - ); + : new GolemAbortError("Initializing of the exe-unit has been aborted", options.signalOrTimeout.reason); throw error; } throw error; diff --git a/src/lease-process/lease-process.ts b/src/lease-process/lease-process.ts index 0ee914fe8..22f7879b6 100644 --- a/src/lease-process/lease-process.ts +++ b/src/lease-process/lease-process.ts @@ -9,7 +9,6 @@ import { NetworkNode } from "../network"; import { ExecutionOptions } from "../activity/exe-script-executor"; import { MarketModule } from "../market"; import { GolemUserError } from "../shared/error/golem-error"; -import AsyncLock from "async-lock"; export interface LeaseProcessEvents { /** @@ -35,7 +34,6 @@ export class LeaseProcess { private currentWorkContext: WorkContext | null = null; private abortController = new AbortController(); private finalizePromise?: Promise; - private asyncLock = new AsyncLock(); public constructor( public readonly agreement: Agreement, @@ -57,33 +55,31 @@ export class LeaseProcess { */ async finalize() { // Prevent this task from being performed more than once - return this.asyncLock.acquire("finalize", () => { - if (!this.finalizePromise) { - this.finalizePromise = (async () => { - this.abortController.abort("The lease process is finalizing"); - if (this.paymentProcess.isFinished()) { - return; + if (!this.finalizePromise) { + this.finalizePromise = (async () => { + this.abortController.abort("The lease process is finalizing"); + if (this.paymentProcess.isFinished()) { + return; + } + try { + this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); + if (this.currentWorkContext) { + await this.activityModule.destroyActivity(this.currentWorkContext.activity); } - try { - this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); - if (this.currentWorkContext) { - await this.activityModule.destroyActivity(this.currentWorkContext.activity); - } - if ((await this.fetchAgreementState()) !== "Terminated") { - await this.marketModule.terminateAgreement(this.agreement); - } - await waitForCondition(() => this.paymentProcess.isFinished()); - this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); - } catch (error) { - this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); - throw error; - } finally { - this.events.emit("finalized"); + if ((await this.fetchAgreementState()) !== "Terminated") { + await this.marketModule.terminateAgreement(this.agreement); } - })(); - } - return this.finalizePromise; - }); + await waitForCondition(() => this.paymentProcess.isFinished()); + this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); + } catch (error) { + this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); + throw error; + } finally { + this.events.emit("finalized"); + } + })(); + } + return this.finalizePromise; } public hasActivity(): boolean { diff --git a/src/market/draft-offer-proposal-pool.ts b/src/market/draft-offer-proposal-pool.ts index db330c28c..71a02a186 100644 --- a/src/market/draft-offer-proposal-pool.ts +++ b/src/market/draft-offer-proposal-pool.ts @@ -105,7 +105,7 @@ export class DraftOfferProposalPool { } /** - * Attempts to obtain a single proposal from the poolonds. + * Attempts to obtain a single proposal from the pool * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the acquiring */ public acquire(signalOrTimeout?: number | AbortSignal): Promise { diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index 485586f7b..50e94fb1b 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -188,6 +188,7 @@ describe("LeaseProcessPool", () => { }); it("should finalize the lease process during execution", async () => { + expect.assertions(1); const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); const leaseProcess = await pool.acquire(); const exe = await leaseProcess.getExeUnit(); From d85ecdc3184b18565c359639437f9320322cccd9 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Wed, 19 Jun 2024 12:05:19 +0200 Subject: [PATCH 9/9] refactor: moved catching AbortError for initialization to work-context --- src/activity/activity.module.ts | 11 -- src/activity/work/work.ts | 177 ++++++++++++++++---------------- 2 files changed, 91 insertions(+), 97 deletions(-) diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index b4cf8a32b..a59ec1e35 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -7,7 +7,6 @@ import { WorkContext, WorkOptions } from "./work"; import { ExeScriptExecutor, ExeScriptRequest, ExecutionOptions } from "./exe-script-executor"; import { Observable, catchError, tap } from "rxjs"; import { StreamingBatchEvent } from "./results"; -import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; export interface ActivityModule { events: EventEmitter; @@ -241,16 +240,6 @@ export class ActivityModuleImpl implements ActivityModule { return ctx; } catch (error) { this.events.emit("errorInitializingActivity", activity, error); - if (options?.signalOrTimeout instanceof AbortSignal && options.signalOrTimeout.aborted) { - const error = - options.signalOrTimeout.reason.name === "TimeoutError" - ? new GolemTimeoutError( - "Initializing of the exe-unit has been aborted due to a timeout", - options.signalOrTimeout.reason, - ) - : new GolemAbortError("Initializing of the exe-unit has been aborted", options.signalOrTimeout.reason); - throw error; - } throw error; } } diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index ffb2dc3db..ce47701e7 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -26,13 +26,11 @@ import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; export type Worker = (ctx: WorkContext) => Promise; const DEFAULTS = { - activityPreparingTimeout: 300_000, - activityStateCheckInterval: 1000, + activityDeployingTimeout: 300_000, }; export interface WorkOptions { - activityPreparingTimeout?: number; - activityStateCheckingInterval?: number; + activityDeployingTimeout?: number; storageProvider?: StorageProvider; networkNode?: NetworkNode; logger?: Logger; @@ -59,8 +57,7 @@ export interface ActivityDTO { * Groups most common operations that the requestors might need to implement their workflows */ export class WorkContext { - private readonly activityPreparingTimeout: number; - private readonly activityStateCheckingInterval: number; + private readonly activityDeployingTimeout: number; public readonly provider: ProviderInfo; private readonly logger: Logger; @@ -76,8 +73,7 @@ export class WorkContext { public readonly activityModule: ActivityModule, private options?: WorkOptions, ) { - this.activityPreparingTimeout = options?.activityPreparingTimeout || DEFAULTS.activityPreparingTimeout; - this.activityStateCheckingInterval = options?.activityStateCheckingInterval || DEFAULTS.activityStateCheckInterval; + this.activityDeployingTimeout = options?.activityDeployingTimeout || DEFAULTS.activityDeployingTimeout; this.logger = options?.logger ?? defaultLogger("work"); this.provider = activity.getProviderInfo(); @@ -111,89 +107,98 @@ export class WorkContext { } async before(): Promise { - let state = await this.fetchState(); - if (state === ActivityStateEnum.Ready) { - await this.setupActivity(); - return; - } - - if (state === ActivityStateEnum.Initialized) { - const result = await this.executor - .execute( - new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(), - undefined, - this.activityPreparingTimeout, - ) - .catch((e) => { - throw new GolemWorkError( - `Unable to deploy activity. ${e}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.getProviderInfo(), - e, - ); - }); - - let timeoutId: NodeJS.Timeout; - - await Promise.race([ - new Promise( - (res, rej) => - (timeoutId = setTimeout( - () => rej(new GolemTimeoutError("Preparing activity timeout")), - this.activityPreparingTimeout, - )), - ), - (async () => { - for await (const res of result) { - if (res.result === "Error") - throw new GolemWorkError( - `Preparing activity failed. Error: ${res.message}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.getProviderInfo(), - ); - } - })(), - ]) - .catch((error) => { - if (this.abortSignal.aborted) { - const message = "Initializing of activity has been aborted"; - this.logger.warn(message, { activityId: this.activity.id, reason: this.abortSignal.reason }); - throw new GolemAbortError(message, this.abortSignal.reason); - } - if (error instanceof GolemWorkError) { - throw error; - } - throw new GolemWorkError( - `Preparing activity failed. Error: ${error.toString()}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.getProviderInfo(), - error, - ); - }) - .finally(() => clearTimeout(timeoutId)); - } + try { + let state = await this.fetchState(); + if (state === ActivityStateEnum.Ready) { + await this.setupActivity(); + return; + } - await sleep(this.activityStateCheckingInterval, true); + if (state === ActivityStateEnum.Initialized) { + await this.deployActivity(); + } - state = await this.fetchState(); + await sleep(1000, true); + state = await this.fetchState(); - if (state !== ActivityStateEnum.Ready) { - throw new GolemWorkError( - `Activity ${this.activity.id} cannot reach the Ready state. Current state: ${state}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.getProviderInfo(), - ); + if (state !== ActivityStateEnum.Ready) { + throw new GolemWorkError( + `Activity ${this.activity.id} cannot reach the Ready state. Current state: ${state}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.getProviderInfo(), + ); + } + await this.setupActivity(); + } catch (error) { + if (this.abortSignal.aborted) { + throw this.abortSignal.reason.name === "TimeoutError" + ? new GolemTimeoutError( + "Initializing of the exe-unit has been aborted due to a timeout", + this.abortSignal.reason, + ) + : new GolemAbortError("Initializing of the exe-unit has been aborted", this.abortSignal.reason); + } + throw error; } + } + + private async deployActivity() { + const result = await this.executor + .execute( + new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(), + undefined, + this.activityDeployingTimeout, + ) + .catch((e) => { + throw new GolemWorkError( + `Unable to deploy activity. ${e}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.getProviderInfo(), + e, + ); + }); - await this.setupActivity(); + let timeoutId: NodeJS.Timeout; + + await Promise.race([ + new Promise( + (res, rej) => + (timeoutId = setTimeout( + () => rej(new GolemTimeoutError("Deploing activity has been aborted due to a timeout")), + this.activityDeployingTimeout, + )), + ), + (async () => { + for await (const res of result) { + if (res.result === "Error") + throw new GolemWorkError( + `Deploing activity failed. Error: ${res.message}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.getProviderInfo(), + ); + } + })(), + ]) + .catch((error) => { + if (error instanceof GolemWorkError) { + throw error; + } + throw new GolemWorkError( + `Deploing activity failed. Error: ${error.toString()}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.getProviderInfo(), + error, + ); + }) + .finally(() => clearTimeout(timeoutId)); } private async setupActivity() {