diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index 626cffadb..be982e272 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -187,7 +187,7 @@ export class ActivityModuleImpl implements ActivityModule { batchId: string, commandIndex?: number | undefined, ): Observable { - this.logger.info("Observing streaming batch events", { activityId: activity.id, batchId }); + this.logger.debug("Observing streaming batch events", { activityId: activity.id, batchId }); return this.activityApi.getExecBatchEvents(activity, batchId, commandIndex).pipe( tap(async (event) => { this.events.emit( diff --git a/src/activity/config.ts b/src/activity/config.ts index 13bc9f9f6..c313c0c3d 100644 --- a/src/activity/config.ts +++ b/src/activity/config.ts @@ -1,8 +1,6 @@ import { ExecutionOptions } from "./exe-script-executor"; const DEFAULTS = { - activityRequestTimeout: 10000, - activityExecuteTimeout: 1000 * 60 * 5, // 5 min, activityExeBatchResultPollIntervalSeconds: 5, activityExeBatchResultMaxRetries: 20, }; @@ -11,14 +9,10 @@ const DEFAULTS = { * @internal */ export class ExecutionConfig { - public readonly activityRequestTimeout: number; - public readonly activityExecuteTimeout: number; public readonly activityExeBatchResultPollIntervalSeconds: number; public readonly activityExeBatchResultMaxRetries: number; constructor(options?: ExecutionOptions) { - this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout; - this.activityExecuteTimeout = options?.activityExecuteTimeout || DEFAULTS.activityExecuteTimeout; this.activityExeBatchResultMaxRetries = options?.activityExeBatchResultMaxRetries || DEFAULTS.activityExeBatchResultMaxRetries; this.activityExeBatchResultPollIntervalSeconds = diff --git a/src/activity/exe-script-executor.test.ts b/src/activity/exe-script-executor.test.ts index d26ef1b33..7f7d19f2a 100644 --- a/src/activity/exe-script-executor.test.ts +++ b/src/activity/exe-script-executor.test.ts @@ -4,7 +4,7 @@ import { Capture, Deploy, DownloadFile, Run, Script, Start, Terminate, UploadFil import { buildExeScriptSuccessResult } from "../../tests/unit/helpers"; import { GolemWorkError, WorkErrorCode } from "./work"; import { Logger, sleep } from "../shared/utils"; -import { GolemError, GolemTimeoutError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemError } from "../shared/error/golem-error"; import { ExeScriptExecutor } from "./exe-script-executor"; import { StorageProvider } from "../shared/storage"; import { from, of, throwError } from "rxjs"; @@ -86,7 +86,6 @@ describe("ExeScriptExecutor", () => { } await script.after([]); - await executor.stop(); }); it("should execute script and get results by events", async () => { @@ -131,7 +130,6 @@ describe("ExeScriptExecutor", () => { }); results.on("end", async () => { await script.after([]); - await executor.stop(); expect(resultCount).toEqual(6); return res(); }); @@ -229,16 +227,17 @@ describe("ExeScriptExecutor", () => { } expect(expectedStdout).toEqual("test"); await script.after([]); - await executor.stop(); }); }); describe("Cancelling", () => { it("should cancel executor", async () => { + const ac = new AbortController(); const executor = new ExeScriptExecutor( instance(mockActivity), instance(mockActivityModule), instance(mockLogger), + { signalOrTimeout: ac.signal }, ); const command1 = new Deploy(); const command2 = new Start(); @@ -249,10 +248,10 @@ describe("ExeScriptExecutor", () => { const script = Script.create([command1, command2, command3, command4, command5, command6]); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), undefined, undefined); - await executor.stop(); + ac.abort(); return new Promise((res) => { results.on("error", (error) => { - expect(error.toString()).toMatch(/Error: Activity .* has been interrupted/); + expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); return res(); }); results.on("data", () => null); @@ -261,10 +260,14 @@ describe("ExeScriptExecutor", () => { it("should cancel executor while streaming batch", async () => { when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(of()); + const ac = new AbortController(); const executor = new ExeScriptExecutor( instance(mockActivity), instance(mockActivityModule), instance(mockLogger), + { + signalOrTimeout: ac.signal, + }, ); const command1 = new Deploy(); const command2 = new Start(); @@ -277,10 +280,10 @@ describe("ExeScriptExecutor", () => { const script = Script.create([command1, command2, command3, command4]); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), true, undefined); - await executor.stop(); + ac.abort(); return new Promise((res) => { results.on("error", (error) => { - expect(error.toString()).toMatch(/Error: Activity .* has been interrupted/); + expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); return res(); }); results.on("data", () => null); @@ -386,7 +389,7 @@ describe("ExeScriptExecutor", () => { .thenReject(error) .thenResolve([testResult]); - const results = await executor.execute(script.getExeScriptRequest(), false, 1_000, 10); + const results = await executor.execute(script.getExeScriptRequest(), false, undefined, 10); for await (const result of results) { expect(result).toEqual(testResult); @@ -446,8 +449,8 @@ describe("ExeScriptExecutor", () => { await sleep(10, true); return new Promise((res) => { results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemTimeoutError); - expect(error.toString()).toMatch(/Error: Activity .* timeout/); + expect(error).toBeInstanceOf(GolemAbortError); + expect(error.toString()).toEqual("Error: Execution of script has been aborted"); return res(); }); // results.on("end", () => rej()); @@ -455,14 +458,14 @@ describe("ExeScriptExecutor", () => { }); }); - it("should handle timeout error while streaming batch", async () => { + it("should handle abort error while streaming batch", async () => { when(mockActivityModule.observeStreamingBatchEvents(anything(), anything())).thenReturn(of()); const executor = new ExeScriptExecutor( instance(mockActivity), instance(mockActivityModule), instance(mockLogger), { - activityExecuteTimeout: 1, + signalOrTimeout: 1, }, ); const command1 = new Deploy(); @@ -478,8 +481,8 @@ describe("ExeScriptExecutor", () => { const results = await executor.execute(script.getExeScriptRequest(), true, 800); return new Promise((res, rej) => { results.on("error", (error: GolemError) => { - expect(error).toBeInstanceOf(GolemTimeoutError); - expect(error.toString()).toMatch(/Error: Activity .* timeout/); + expect(error).toBeInstanceOf(GolemAbortError); + expect(error.toString()).toEqual("Error: Execution of script has been aborted"); return res(); }); results.on("end", () => rej()); diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index ff9c00005..b8c00bc10 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -1,9 +1,9 @@ -import { Logger } from "../shared/utils"; +import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; import { ExecutionConfig } from "./config"; import { Readable } from "stream"; import { GolemWorkError, WorkErrorCode } from "./work"; import { withTimeout } from "../shared/utils/timeout"; -import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; +import { GolemAbortError } from "../shared/error/golem-error"; import retry from "async-retry"; import { Result, StreamingBatchEvent } from "./results"; import sleep from "../shared/utils/sleep"; @@ -16,21 +16,19 @@ export interface ExeScriptRequest { } export interface ExecutionOptions { - /** timeout for sending and creating batch */ - activityRequestTimeout?: number; - /** timeout for executing batch */ - activityExecuteTimeout?: number; /** interval for fetching batch results while polling */ activityExeBatchResultPollIntervalSeconds?: number; /** maximum number of retries retrieving results when an error occurs, default: 10 */ activityExeBatchResultMaxRetries?: number; + /** The timeout in milliseconds or an AbortSignal that will be used to cancel the execution */ + signalOrTimeout?: number | AbortSignal; } const RETRYABLE_ERROR_STATUS_CODES = [408, 500]; export class ExeScriptExecutor { - private isRunning = false; private readonly options: ExecutionConfig; + private readonly abortSignal: AbortSignal; constructor( public readonly activity: Activity, @@ -39,13 +37,7 @@ export class ExeScriptExecutor { options?: ExecutionOptions, ) { this.options = new ExecutionConfig(options); - } - - /** - * Stops the executor mid-flight - */ - public stop() { - this.isRunning = false; + this.abortSignal = createAbortSignalFromTimeout(options?.signalOrTimeout); } /** @@ -53,29 +45,35 @@ export class ExeScriptExecutor { * * @param script - exe script request * @param stream - define type of getting results from execution (polling or streaming) - * @param timeout - execution timeout + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the execution * @param maxRetries - maximum number of retries retrieving results when an error occurs, default: 10 */ public async execute( script: ExeScriptRequest, stream?: boolean, - timeout?: number, + signalOrTimeout?: number | AbortSignal, maxRetries?: number, ): Promise { let batchId: string, batchSize: number; - let startTime = new Date(); - this.isRunning = true; + const abortController = new AbortController(); + // abort execution in case of cancellation by global signal or by local signal (from parameter) + this.abortSignal.addEventListener("abort", () => abortController.abort(this.abortSignal.reason)); + if (signalOrTimeout) { + const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); + abortSignal.addEventListener("abort", () => abortController.abort(abortSignal.reason)); + } try { + abortController.signal.throwIfAborted(); batchId = await this.send(script); - startTime = new Date(); batchSize = JSON.parse(script.text).length; + abortController.signal.throwIfAborted(); this.logger.debug(`Script sent.`, { batchId }); return stream - ? this.streamingBatch(batchId, batchSize, startTime, timeout) - : this.pollingBatch(batchId, startTime, timeout, maxRetries); + ? this.streamingBatch(batchId, batchSize, abortController.signal) + : this.pollingBatch(batchId, abortController.signal, maxRetries); } catch (error) { const message = getMessageFromApiError(error); @@ -83,8 +81,11 @@ export class ExeScriptExecutor { reason: message, }); + if (abortController.signal.aborted) { + throw new GolemAbortError("Executions of script has been aborted", this.abortSignal.reason); + } throw new GolemWorkError( - `Unable to execute script ${message}`, + `Unable to execute script. ${message}`, WorkErrorCode.ScriptExecutionFailed, this.activity.agreement, this.activity, @@ -95,15 +96,10 @@ export class ExeScriptExecutor { } protected async send(script: ExeScriptRequest): Promise { - return withTimeout(this.activityModule.executeScript(this.activity, script), this.options.activityRequestTimeout); + return withTimeout(this.activityModule.executeScript(this.activity, script), 10_000); } - private async pollingBatch( - batchId: string, - startTime: Date, - timeout?: number, - maxRetries?: number, - ): Promise { + private async pollingBatch(batchId: string, abortSignal: AbortSignal, maxRetries?: number): Promise { this.logger.debug("Starting to poll for batch results"); let isBatchFinished = false; @@ -111,32 +107,27 @@ export class ExeScriptExecutor { const { id: activityId, agreement } = this.activity; - const isRunning = () => this.isRunning; - - const { activityExecuteTimeout, activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = - this.options; + const { activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = this.options; const { logger, activity, activityModule } = this; return new Readable({ objectMode: true, - async read() { - while (!isBatchFinished) { + const abortError = new GolemAbortError("Execution of script has been aborted", abortSignal.reason); + abortSignal.addEventListener("abort", () => { + logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); + this.destroy(abortError); + }); + while (!isBatchFinished && !abortSignal.aborted) { logger.debug("Polling for batch script execution result"); - if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { - logger.debug("Activity probably timed-out, will stop polling for batch execution results"); - return this.destroy(new GolemTimeoutError(`Activity ${activityId} timeout.`)); - } - try { const results = await retry( async (bail, attempt) => { logger.debug(`Trying to poll for batch execution results from yagna. Attempt: ${attempt}`); try { - if (!isRunning()) { - logger.debug("Activity is no longer running, will stop polling for batch execution results"); - return bail(new GolemAbortError(`Activity ${activityId} has been interrupted.`)); + if (abortSignal.aborted) { + return bail(abortError); } return await activityModule.getBatchResults( activity, @@ -171,7 +162,10 @@ export class ExeScriptExecutor { }); } } catch (error) { - logger.error(`Processing batch execution results failed`, error); + if (abortSignal.aborted) { + return this.destroy(abortError); + } + logger.error(`Processing script execution results failed`, error); return this.destroy( error instanceof GolemWorkError @@ -187,18 +181,15 @@ export class ExeScriptExecutor { ); } } - + if (abortSignal.aborted) { + return this.destroy(abortError); + } this.push(null); }, }); } - private async streamingBatch( - batchId: string, - batchSize: number, - startTime: Date, - timeout?: number, - ): Promise { + private async streamingBatch(batchId: string, batchSize: number, abortSignal: AbortSignal): Promise { const errors: object[] = []; const results: Result[] = []; @@ -209,24 +200,22 @@ export class ExeScriptExecutor { }); let isBatchFinished = false; - const isRunning = () => this.isRunning; - const activityExecuteTimeout = this.options.activityExecuteTimeout; const { logger, activity } = this; return new Readable({ objectMode: true, async read() { - while (!isBatchFinished) { + const abortError = new GolemAbortError("Execution of script has been aborted", abortSignal.reason); + abortSignal.addEventListener("abort", () => { + logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); + this.destroy(abortError); + }); + while (!isBatchFinished && !abortSignal.aborted) { let error: Error | undefined; - if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { - logger.debug("Activity probably timed-out, will stop streaming batch execution results"); - error = new GolemTimeoutError(`Activity ${activity.id} timeout.`); - } - if (!isRunning()) { - logger.debug("Activity is no longer running, will stop streaming batch execution results"); - error = new GolemAbortError(`Activity ${activity.id} has been interrupted.`); + if (abortSignal.aborted) { + error = abortError; } if (errors.length) { @@ -250,8 +239,12 @@ export class ExeScriptExecutor { } await sleep(500, true); } - - this.push(null); + if (abortSignal.aborted) { + this.destroy(abortError); + return; + } else { + this.push(null); + } source.unsubscribe(); }, }); diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index feece78b4..7214f59de 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -13,12 +13,12 @@ import { UploadFile, } from "../script"; import { NullStorageProvider, StorageProvider } from "../../shared/storage"; -import { defaultLogger, Logger, sleep, YagnaOptions } from "../../shared/utils"; +import { createAbortSignalFromTimeout, defaultLogger, Logger, sleep, YagnaOptions } from "../../shared/utils"; import { Batch } from "./batch"; import { NetworkNode } from "../../network"; import { RemoteProcess } from "./process"; import { GolemWorkError, WorkErrorCode } from "./error"; -import { GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-error"; +import { GolemAbortError, GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-error"; import { Agreement, ProviderInfo } from "../../market/agreement"; import { TcpProxy } from "../../network/tcpProxy"; import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; @@ -26,23 +26,23 @@ import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; export type Worker = (ctx: WorkContext) => Promise; const DEFAULTS = { - activityPreparingTimeout: 300_000, - activityStateCheckInterval: 1000, + activityDeployingTimeout: 300_000, }; export interface WorkOptions { - activityPreparingTimeout?: number; - activityStateCheckingInterval?: number; + activityDeployingTimeout?: number; storageProvider?: StorageProvider; networkNode?: NetworkNode; logger?: Logger; activityReadySetupFunctions?: Worker[]; yagnaOptions?: YagnaOptions; execution?: ExecutionOptions; + signalOrTimeout?: number | AbortSignal; } export interface CommandOptions { - timeout?: number; + signalOrTimeout?: number | AbortSignal; + maxRetries?: number; env?: object; capture?: Capture; } @@ -57,8 +57,7 @@ export interface ActivityDTO { * Groups most common operations that the requestors might need to implement their workflows */ export class WorkContext { - private readonly activityPreparingTimeout: number; - private readonly activityStateCheckingInterval: number; + private readonly activityDeployingTimeout: number; public readonly provider: ProviderInfo; private readonly logger: Logger; @@ -67,25 +66,31 @@ export class WorkContext { private readonly networkNode?: NetworkNode; private executor: ExeScriptExecutor; + private readonly abortSignal: AbortSignal; constructor( public readonly activity: Activity, public readonly activityModule: ActivityModule, private options?: WorkOptions, ) { - this.activityPreparingTimeout = options?.activityPreparingTimeout || DEFAULTS.activityPreparingTimeout; - this.activityStateCheckingInterval = options?.activityStateCheckingInterval || DEFAULTS.activityStateCheckInterval; + this.activityDeployingTimeout = options?.activityDeployingTimeout || DEFAULTS.activityDeployingTimeout; this.logger = options?.logger ?? defaultLogger("work"); this.provider = activity.provider; this.storageProvider = options?.storageProvider ?? new NullStorageProvider(); this.networkNode = options?.networkNode; - - this.executor = this.activityModule.createScriptExecutor(this.activity, this.options?.execution); + this.abortSignal = createAbortSignalFromTimeout(options?.signalOrTimeout); + this.executor = this.activityModule.createScriptExecutor(this.activity, { + ...this.options?.execution, + signalOrTimeout: this.abortSignal, + }); } private async fetchState(): Promise { + if (this.abortSignal.aborted) { + throw new GolemAbortError("ExeUnit has been aborted"); + } return this.activityModule .refreshActivity(this.activity) .then((activity) => activity.getState()) @@ -102,84 +107,98 @@ export class WorkContext { } async before(): Promise { - let state = await this.fetchState(); - if (state === ActivityStateEnum.Ready) { - await this.setupActivity(); - return; - } - - if (state === ActivityStateEnum.Initialized) { - const result = await this.executor - .execute( - new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(), - undefined, - this.activityPreparingTimeout, - ) - .catch((e) => { - throw new GolemWorkError( - `Unable to deploy activity. ${e}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - e, - ); - }); - - let timeoutId: NodeJS.Timeout; - - await Promise.race([ - new Promise( - (res, rej) => - (timeoutId = setTimeout( - () => rej(new GolemTimeoutError("Preparing activity timeout")), - this.activityPreparingTimeout, - )), - ), - (async () => { - for await (const res of result) { - if (res.result === "Error") - throw new GolemWorkError( - `Preparing activity failed. Error: ${res.message}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - ); - } - })(), - ]) - .catch((error) => { - if (error instanceof GolemWorkError) { - throw error; - } - throw new GolemWorkError( - `Preparing activity failed. Error: ${error.toString()}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - error, - ); - }) - .finally(() => clearTimeout(timeoutId)); - } + try { + let state = await this.fetchState(); + if (state === ActivityStateEnum.Ready) { + await this.setupActivity(); + return; + } - await sleep(this.activityStateCheckingInterval, true); + if (state === ActivityStateEnum.Initialized) { + await this.deployActivity(); + } - state = await this.fetchState(); + await sleep(1000, true); + state = await this.fetchState(); - if (state !== ActivityStateEnum.Ready) { - throw new GolemWorkError( - `Activity ${this.activity.id} cannot reach the Ready state. Current state: ${state}`, - WorkErrorCode.ActivityDeploymentFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - ); + if (state !== ActivityStateEnum.Ready) { + throw new GolemWorkError( + `Activity ${this.activity.id} cannot reach the Ready state. Current state: ${state}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.provider, + ); + } + await this.setupActivity(); + } catch (error) { + if (this.abortSignal.aborted) { + throw this.abortSignal.reason.name === "TimeoutError" + ? new GolemTimeoutError( + "Initializing of the exe-unit has been aborted due to a timeout", + this.abortSignal.reason, + ) + : new GolemAbortError("Initializing of the exe-unit has been aborted", this.abortSignal.reason); + } + throw error; } + } - await this.setupActivity(); + private async deployActivity() { + const result = await this.executor + .execute( + new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(), + undefined, + this.activityDeployingTimeout, + ) + .catch((e) => { + throw new GolemWorkError( + `Unable to deploy activity. ${e}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.provider, + e, + ); + }); + + let timeoutId: NodeJS.Timeout; + + await Promise.race([ + new Promise( + (res, rej) => + (timeoutId = setTimeout( + () => rej(new GolemTimeoutError("Deploing activity has been aborted due to a timeout")), + this.activityDeployingTimeout, + )), + ), + (async () => { + for await (const res of result) { + if (res.result === "Error") + throw new GolemWorkError( + `Deploing activity failed. Error: ${res.message}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.provider, + ); + } + })(), + ]) + .catch((error) => { + if (error instanceof GolemWorkError) { + throw error; + } + throw new GolemWorkError( + `Deploing activity failed. Error: ${error.toString()}`, + WorkErrorCode.ActivityDeploymentFailed, + this.activity.agreement, + this.activity, + this.activity.provider, + error, + ); + }) + .finally(() => clearTimeout(timeoutId)); } private async setupActivity() { @@ -254,7 +273,7 @@ export class WorkContext { // In this case, the script consists only of one run command, // so we skip the execution of script.before and script.after const streamOfActivityResults = await this.executor - .execute(script.getExeScriptRequest(), true, options?.timeout) + .execute(script.getExeScriptRequest(), true, options?.signalOrTimeout, options?.maxRetries) .catch((e) => { throw new GolemWorkError( `Script execution failed for command: ${JSON.stringify(run.toJson())}. ${ @@ -401,7 +420,12 @@ export class WorkContext { await sleep(100, true); // Send script. - const results = await this.executor.execute(script.getExeScriptRequest(), false, options?.timeout); + const results = await this.executor.execute( + script.getExeScriptRequest(), + false, + options?.signalOrTimeout, + options?.maxRetries, + ); // Process result. let allResults: Result[] = []; diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 66f01b2e2..25579bf76 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -358,8 +358,9 @@ export class GolemNetwork { * ``` * * @param order + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the lease request */ - async oneOf(order: MarketOrderSpec): Promise { + async oneOf(order: MarketOrderSpec, signalOrTimeout?: number | AbortSignal): Promise { const proposalPool = new DraftOfferProposalPool({ logger: this.logger, validateProposal: order.market.proposalFilter, @@ -377,9 +378,13 @@ export class GolemNetwork { const proposalSubscription = proposalPool.readFrom(draftProposal$); - const agreement = await this.market.signAgreementFromPool(proposalPool, { - expirationSec: order.market.rentHours * 60 * 60, - }); + const agreement = await this.market.signAgreementFromPool( + proposalPool, + { + expirationSec: order.market.rentHours * 60 * 60, + }, + signalOrTimeout, + ); const networkNode = order.network ? await this.network.createNetworkNode(order.network, agreement.provider.id) diff --git a/src/lease-process/lease-process-pool.test.ts b/src/lease-process/lease-process-pool.test.ts index 6eff64ef8..eba087de5 100644 --- a/src/lease-process/lease-process-pool.test.ts +++ b/src/lease-process/lease-process-pool.test.ts @@ -58,13 +58,13 @@ describe("LeaseProcessPool", () => { await pool.ready(); expect(pool.getAvailableSize()).toBe(5); - verify(marketModule.signAgreementFromPool(_, _)).times(5); + verify(marketModule.signAgreementFromPool(_, _, _)).times(5); }); it("retries on error", async () => { when(leaseModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess); const fakeAgreement = {} as Agreement; - when(marketModule.signAgreementFromPool(_, _)) + when(marketModule.signAgreementFromPool(_, _, _)) .thenResolve(fakeAgreement) .thenReject(new Error("Failed to propose agreement")) .thenResolve(fakeAgreement) @@ -76,7 +76,7 @@ describe("LeaseProcessPool", () => { await pool.ready(); expect(pool.getAvailableSize()).toBe(3); - verify(marketModule.signAgreementFromPool(_, _)).times(5); + verify(marketModule.signAgreementFromPool(_, _, _)).times(5); }); it("stops retrying after abort signal is triggered", async () => { const pool = getLeasePool({ min: 3 }); diff --git a/src/lease-process/lease-process-pool.ts b/src/lease-process/lease-process-pool.ts index a4298dd7a..752f112a1 100644 --- a/src/lease-process/lease-process-pool.ts +++ b/src/lease-process/lease-process-pool.ts @@ -105,11 +105,15 @@ export class LeaseProcessPool { })() || MAX_REPLICAS; } - private async createNewLeaseProcess() { + private async createNewLeaseProcess(signalOrTimeout?: number | AbortSignal) { this.logger.debug("Creating new lease process to add to pool"); try { this.leasesBeingSigned++; - const agreement = await this.marketModule.signAgreementFromPool(this.proposalPool, this.agreementOptions); + const agreement = await this.marketModule.signAgreementFromPool( + this.proposalPool, + this.agreementOptions, + signalOrTimeout, + ); const networkNode = this.network ? await this.networkModule.createNetworkNode(this.network, agreement.provider.id) : undefined; @@ -183,8 +187,9 @@ export class LeaseProcessPool { /** * Borrow a lease process from the pool. If there is no valid lease process a new one will be created. + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the lease request */ - async acquire(): Promise { + async acquire(signalOrTimeout?: number | AbortSignal): Promise { if (this.isDraining) { throw new Error("The pool is in draining mode"); } @@ -193,7 +198,7 @@ export class LeaseProcessPool { if (!this.canCreateMoreLeaseProcesses()) { return this.enqueueAcquire(); } - leaseProcess = await this.createNewLeaseProcess(); + leaseProcess = await this.createNewLeaseProcess(signalOrTimeout); } this.borrowed.add(leaseProcess); this.events.emit("acquired", leaseProcess.agreement); @@ -356,9 +361,14 @@ export class LeaseProcessPool { * // even if an error is thrown in the callback * }); * ``` + * @param callback - a function that takes a `lease` object as its argument. The lease is automatically released after the callback is executed, regardless of whether it completes successfully or throws an error. + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the lease request */ - public async withLease(callback: (lease: LeaseProcess) => Promise): Promise { - const lease = await this.acquire(); + public async withLease( + callback: (lease: LeaseProcess) => Promise, + signalOrTimeout?: number | AbortSignal, + ): Promise { + const lease = await this.acquire(signalOrTimeout); try { return await callback(lease); } finally { diff --git a/src/lease-process/lease-process.ts b/src/lease-process/lease-process.ts index 489862c04..22f7879b6 100644 --- a/src/lease-process/lease-process.ts +++ b/src/lease-process/lease-process.ts @@ -8,6 +8,7 @@ import { EventEmitter } from "eventemitter3"; import { NetworkNode } from "../network"; import { ExecutionOptions } from "../activity/exe-script-executor"; import { MarketModule } from "../market"; +import { GolemUserError } from "../shared/error/golem-error"; export interface LeaseProcessEvents { /** @@ -31,6 +32,8 @@ export class LeaseProcess { public readonly networkNode?: NetworkNode; private currentWorkContext: WorkContext | null = null; + private abortController = new AbortController(); + private finalizePromise?: Promise; public constructor( public readonly agreement: Agreement, @@ -51,26 +54,32 @@ export class LeaseProcess { * If the lease is already finalized, it will resolve immediately. */ async finalize() { - if (this.paymentProcess.isFinished()) { - return; - } - - try { - this.logger.debug("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); - if (this.currentWorkContext) { - await this.activityModule.destroyActivity(this.currentWorkContext.activity); - if ((await this.fetchAgreementState()) !== "Terminated") { - await this.marketModule.terminateAgreement(this.agreement); + // Prevent this task from being performed more than once + if (!this.finalizePromise) { + this.finalizePromise = (async () => { + this.abortController.abort("The lease process is finalizing"); + if (this.paymentProcess.isFinished()) { + return; } - } - await waitForCondition(() => this.paymentProcess.isFinished()); - this.logger.debug("Payment process for agreement finalized", { agreementId: this.agreement.id }); - } catch (error) { - this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); - throw error; - } finally { - this.events.emit("finalized"); + try { + this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); + if (this.currentWorkContext) { + await this.activityModule.destroyActivity(this.currentWorkContext.activity); + } + if ((await this.fetchAgreementState()) !== "Terminated") { + await this.marketModule.terminateAgreement(this.agreement); + } + await waitForCondition(() => this.paymentProcess.isFinished()); + this.logger.info("Payment process for agreement finalized", { agreementId: this.agreement.id }); + } catch (error) { + this.logger.error("Payment process finalization failed", { agreementId: this.agreement.id, error }); + throw error; + } finally { + this.events.emit("finalized"); + } + })(); } + return this.finalizePromise; } public hasActivity(): boolean { @@ -81,6 +90,9 @@ export class LeaseProcess { * Creates an activity on the Provider, and returns a work context that can be used to operate within the activity */ async getExeUnit(): Promise { + if (this.finalizePromise || this.abortController.signal.aborted) { + throw new GolemUserError("The lease process is not active. It may have been aborted or finalized"); + } if (this.currentWorkContext) { return this.currentWorkContext; } @@ -89,7 +101,8 @@ export class LeaseProcess { this.currentWorkContext = await this.activityModule.createWorkContext(activity, { storageProvider: this.storageProvider, networkNode: this.leaseOptions?.networkNode, - execution: this.leaseOptions?.activity, + execution: { ...this.leaseOptions?.activity }, + signalOrTimeout: this.abortController.signal, }); return this.currentWorkContext; diff --git a/src/market/draft-offer-proposal-pool.test.ts b/src/market/draft-offer-proposal-pool.test.ts index 8005e82ce..e4dc54b26 100644 --- a/src/market/draft-offer-proposal-pool.test.ts +++ b/src/market/draft-offer-proposal-pool.test.ts @@ -1,6 +1,7 @@ import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { instance, mock, when } from "@johanblumenberg/ts-mockito"; import { OfferProposal } from "./index"; +import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; describe("Draft Offer Proposal Pool", () => { // GIVEN @@ -107,6 +108,20 @@ describe("Draft Offer Proposal Pool", () => { expect(a).toBe(proposal2); }); }); + describe("Negative cases", () => { + it("should abort the acquiring proposal by timeout", async () => { + const pool = new DraftOfferProposalPool(); + await expect(pool.acquire(1)).rejects.toThrow(new GolemTimeoutError("Could not provide any proposal in time")); + }); + it("should abort the acquiring proposal by signal", async () => { + const pool = new DraftOfferProposalPool(); + const ac = new AbortController(); + ac.abort(); + await expect(pool.acquire(ac.signal)).rejects.toThrow( + new GolemAbortError("The acquiring of proposals has been aborted"), + ); + }); + }); }); describe("Is ready", () => { diff --git a/src/market/draft-offer-proposal-pool.ts b/src/market/draft-offer-proposal-pool.ts index 5c9734429..71a02a186 100644 --- a/src/market/draft-offer-proposal-pool.ts +++ b/src/market/draft-offer-proposal-pool.ts @@ -2,8 +2,9 @@ import { OfferProposal, ProposalFilter } from "./proposal/offer-proposal"; import AsyncLock from "async-lock"; import { EventEmitter } from "eventemitter3"; import { GolemMarketError, MarketErrorCode } from "./error"; -import { defaultLogger, Logger, sleep } from "../shared/utils"; +import { createAbortSignalFromTimeout, defaultLogger, Logger, sleep } from "../shared/utils"; import { Observable, Subscription } from "rxjs"; +import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; export type ProposalSelector = (proposals: OfferProposal[]) => OfferProposal; @@ -27,13 +28,6 @@ export interface ProposalPoolOptions { */ minCount?: number; - /** - * Number of seconds to wait for an acquire call to finish before throwing an exception - * - * @default 30 - */ - acquireTimeoutSec?: number; - logger?: Logger; } @@ -65,9 +59,6 @@ export class DraftOfferProposalPool { /** {@link ProposalPoolOptions.minCount} */ private readonly minCount: number = 0; - /** {@link ProposalPoolOptions.acquireTimeoutSec} */ - private readonly acquireTimeoutSec: number = 30; - /** {@link ProposalPoolOptions.selectProposal} */ private readonly selectProposal: ProposalSelector = (proposals: OfferProposal[]) => proposals[0]; @@ -96,10 +87,6 @@ export class DraftOfferProposalPool { this.minCount = options.minCount; } - if (options?.acquireTimeoutSec && options.acquireTimeoutSec >= 0) { - this.acquireTimeoutSec = options?.acquireTimeoutSec; - } - this.logger = this.logger = options?.logger || defaultLogger("proposal-pool"); } @@ -119,45 +106,44 @@ export class DraftOfferProposalPool { /** * Attempts to obtain a single proposal from the pool - * - * This method will reject if no suitable proposal will be found within {@link DraftOfferProposalPool.acquireTimeoutSec} seconds. + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the acquiring */ - public acquire(): Promise { - return this.lock.acquire( - "proposal-pool", - async () => { - let proposal: OfferProposal | null = null; - - while (proposal === null) { - // Try to get one - proposal = this.available.size > 0 ? this.selectProposal([...this.available]) : null; - - if (proposal) { - // Validate - if (!this.validateProposal(proposal)) { - // Drop if not valid - this.removeFromAvailable(proposal); - // Keep searching - proposal = null; - } - } - // if not found or not valid wait a while for next try - if (!proposal) { - await sleep(1); + public acquire(signalOrTimeout?: number | AbortSignal): Promise { + const signal = createAbortSignalFromTimeout(signalOrTimeout); + return this.lock.acquire("proposal-pool", async () => { + let proposal: OfferProposal | null = null; + + while (proposal === null) { + if (signal.aborted) { + throw signal.reason.name === "TimeoutError" + ? new GolemTimeoutError("Could not provide any proposal in time") + : new GolemAbortError("The acquiring of proposals has been aborted", signal.reason); + } + // Try to get one + proposal = this.available.size > 0 ? this.selectProposal([...this.available]) : null; + + if (proposal) { + // Validate + if (!this.validateProposal(proposal)) { + // Drop if not valid + this.removeFromAvailable(proposal); + // Keep searching + proposal = null; } } + // if not found or not valid wait a while for next try + if (!proposal) { + await sleep(1); + } + } - this.available.delete(proposal); - this.leased.add(proposal); + this.available.delete(proposal); + this.leased.add(proposal); - this.events.emit("acquired", proposal); + this.events.emit("acquired", proposal); - return proposal; - }, - { - maxOccupationTime: this.acquireTimeoutSec * 1000, - }, - ); + return proposal; + }); } /** diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 311452ff4..b6e34effa 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -14,6 +14,7 @@ import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { Agreement, AgreementEvent, ProviderInfo } from "./agreement"; import { waitAndCall, waitForCondition } from "../shared/utils/wait"; import { MarketOrderSpec } from "../golem-network"; +import { GolemAbortError } from "../shared/error/golem-error"; const mockMarketApiAdapter = mock(MarketApiAdapter); const mockYagna = mock(YagnaApi); @@ -428,7 +429,7 @@ describe("Market module", () => { const badProposal1 = {} as OfferProposal; const goodProposal = {} as OfferProposal; const mockPool = mock(DraftOfferProposalPool); - when(mockPool.acquire()).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal); + when(mockPool.acquire(_)).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal); when(mockPool.remove(_)).thenResolve(); const goodAgreement = {} as Agreement; const marketSpy = spy(marketModule); @@ -438,7 +439,7 @@ describe("Market module", () => { const signedProposal = await marketModule.signAgreementFromPool(instance(mockPool)); - verify(mockPool.acquire()).thrice(); + verify(mockPool.acquire(_)).thrice(); verify(marketSpy.proposeAgreement(badProposal0, _)).once(); verify(mockPool.remove(badProposal0)).once(); verify(marketSpy.proposeAgreement(badProposal1, _)).once(); @@ -452,15 +453,17 @@ describe("Market module", () => { const error = new Error("Operation cancelled"); const proposal = {} as OfferProposal; const mockPool = mock(DraftOfferProposalPool); - when(mockPool.acquire()).thenCall(async () => { + when(mockPool.acquire(_)).thenCall(async () => { ac.abort(error); return proposal; }); const marketSpy = spy(marketModule); - await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, ac.signal)).rejects.toThrow(error); + await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, ac.signal)).rejects.toMatchError( + new GolemAbortError("The signing of the agreement has been aborted", error), + ); - verify(mockPool.acquire()).once(); + verify(mockPool.acquire(_)).once(); verify(mockPool.release(proposal)).once(); verify(mockPool.remove(_)).never(); verify(marketSpy.proposeAgreement(_)).never(); @@ -469,7 +472,7 @@ describe("Market module", () => { const mockPool = mock(DraftOfferProposalPool); const signal = AbortSignal.abort(); await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, signal)).rejects.toThrow( - "This operation was aborted", + "The signing of the agreement has been aborted", ); verify(mockPool.acquire()).never(); }); @@ -481,7 +484,7 @@ describe("Market module", () => { when(marketSpy.proposeAgreement(_)).thenReject(new Error("Failed to sign proposal")); await expect(marketModule.signAgreementFromPool(instance(mockPool), {}, 50)).rejects.toThrow( - "The operation was aborted due to timeout", + "Could not sign any agreement in time", ); }); it("respects the timeout on draft proposal pool acquire and forwards the error", async () => { diff --git a/src/market/market.module.ts b/src/market/market.module.ts index 4a487fe73..544972801 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -36,7 +36,7 @@ import { WorkloadDemandDirector } from "./demand/directors/workload-demand-direc import { WorkloadDemandDirectorConfigOptions } from "./demand/options"; import { BasicDemandDirectorConfig } from "./demand/directors/basic-demand-director-config"; import { PaymentDemandDirectorConfig } from "./demand/directors/payment-demand-director-config"; -import { GolemUserError } from "../shared/error/golem-error"; +import { GolemAbortError, GolemTimeoutError, GolemUserError } from "../shared/error/golem-error"; import { MarketOrderSpec } from "../golem-network"; import { INetworkApi, NetworkModule } from "../network"; import { AgreementOptions } from "./agreement/agreement"; @@ -467,14 +467,27 @@ export class MarketModuleImpl implements MarketModule { ): Promise { const signal = createAbortSignalFromTimeout(signalOrTimeout); - const tryProposing = async (): Promise => { - signal.throwIfAborted(); - const proposal = await draftProposalPool.acquire(); - if (signal.aborted) { - await draftProposalPool.release(proposal); + const getProposal = async () => { + try { signal.throwIfAborted(); + const proposal = await draftProposalPool.acquire(signal); + if (signal.aborted) { + await draftProposalPool.release(proposal); + signal.throwIfAborted(); + } + return proposal; + } catch (error) { + if (signal.aborted) { + throw signal.reason.name === "TimeoutError" + ? new GolemTimeoutError("Could not sign any agreement in time") + : new GolemAbortError("The signing of the agreement has been aborted", error); + } + throw error; } + }; + const tryProposing = async (): Promise => { + const proposal = await getProposal(); try { const agreement = await this.proposeAgreement(proposal, agreementOptions); // agreement is valid, proposal can be destroyed diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index 547fb9094..50e94fb1b 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -1,5 +1,5 @@ import { Subscription } from "rxjs"; -import { Allocation, DraftOfferProposalPool, GolemNetwork } from "../../src"; +import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; describe("LeaseProcessPool", () => { const glm = new GolemNetwork(); @@ -174,4 +174,30 @@ describe("LeaseProcessPool", () => { ); expect(Math.max(...poolSizesDuringWork)).toEqual(maxPoolSize); }); + + it("should abort acquiring lease process by signal", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const abortControler = new AbortController(); + abortControler.abort(); + await expect(pool.acquire(abortControler.signal)).rejects.toThrow("The signing of the agreement has been aborted"); + }); + + it("should abort acquiring lease process by timeout", async () => { + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + await expect(pool.acquire(1_000)).rejects.toThrow("Could not sign any agreement in time"); + }); + + it("should finalize the lease process during execution", async () => { + expect.assertions(1); + const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const leaseProcess = await pool.acquire(); + const exe = await leaseProcess.getExeUnit(); + return new Promise(async (res) => { + leaseProcess.events.on("finalized", async () => res(true)); + setTimeout(() => leaseProcess.finalize(), 8_000); + await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( + new GolemAbortError("Execution of script has been aborted"), + ); + }); + }); }); diff --git a/tests/unit/work.test.ts b/tests/unit/work.test.ts index a31b2c620..c0d049b6b 100644 --- a/tests/unit/work.test.ts +++ b/tests/unit/work.test.ts @@ -14,7 +14,6 @@ import { } from "../../src"; import { _, anyOfClass, anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import { buildExecutorResults, buildExeScriptErrorResult, buildExeScriptSuccessResult } from "./helpers"; -import { IPv4 } from "ip-num"; import { StorageProviderDataCallback } from "../../src/shared/storage/provider"; import { ActivityApi } from "ya-ts-client"; import { ExeScriptExecutor } from "../../src/activity/exe-script-executor"; @@ -55,7 +54,7 @@ describe("Work Context", () => { describe("Executing", () => { it("should execute run command with a single parameter", async () => { - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -75,7 +74,7 @@ describe("Work Context", () => { }); it("should execute run command with multiple parameters", async () => { - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -99,7 +98,7 @@ describe("Work Context", () => { it("should execute upload file command", async () => { const worker = async (ctx: WorkContext) => ctx.uploadFile("./file.txt", "/golem/file.txt"); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -123,7 +122,7 @@ describe("Work Context", () => { it("should execute upload json command", async () => { const worker = async (ctx: WorkContext) => ctx.uploadJson({ test: true }, "/golem/file.txt"); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -150,7 +149,7 @@ describe("Work Context", () => { it("should execute download file command", async () => { const worker = async (ctx: WorkContext) => ctx.downloadFile("/golem/file.txt", "./file.txt"); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -178,7 +177,7 @@ describe("Work Context", () => { const jsonStr = JSON.stringify(json); const encoded = new TextEncoder().encode(jsonStr); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -211,7 +210,7 @@ describe("Work Context", () => { when(mockStorageProvider.receiveData(anything())).thenResolve(data.toString()); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -248,7 +247,7 @@ describe("Work Context", () => { describe("Exec and stream", () => { it("should execute runAndStream command", async () => { const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -269,7 +268,7 @@ describe("Work Context", () => { it("should execute transfer command", async () => { const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0, @@ -425,7 +424,7 @@ describe("Work Context", () => { const eventDate = new Date().toISOString(); - when(mockExecutor.execute(_, _, _)).thenResolve( + when(mockExecutor.execute(_, _, _, _)).thenResolve( buildExecutorResults([ { index: 0,