From 418874f916f7a1bea2a722bdb23a3a68ab9a2d2f Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 15 Nov 2023 21:36:11 +0100 Subject: [PATCH 1/2] fix: fixed issue with too long result polling interval leading to batch timeout JST-357 --- .prettierignore | 3 +- src/activity/activity.ts | 62 ++++++++++++++++++++++++++------- src/activity/config.ts | 8 ++--- src/executor/executor.ts | 2 +- src/script/script.ts | 4 ++- src/task/batch.ts | 69 +++++++++++++++++++++---------------- tests/unit/activity.test.ts | 2 +- 7 files changed, 100 insertions(+), 50 deletions(-) diff --git a/.prettierignore b/.prettierignore index 77738287f..5dba0d190 100644 --- a/.prettierignore +++ b/.prettierignore @@ -1 +1,2 @@ -dist/ \ No newline at end of file +dist/ +*.gvmi \ No newline at end of file diff --git a/src/activity/activity.ts b/src/activity/activity.ts index 4aaa9bac4..1859b2df7 100644 --- a/src/activity/activity.ts +++ b/src/activity/activity.ts @@ -7,6 +7,7 @@ import { ActivityFactory } from "./factory"; import { ActivityConfig } from "./config"; import { Events } from "../events"; import { YagnaApi } from "../utils/yagna/yagna"; +import { AxiosError } from "axios"; export enum ActivityStateEnum { New = "New", @@ -27,7 +28,7 @@ export interface ActivityOptions { /** timeout for executing batch */ activityExecuteTimeout?: number; /** interval for fetching batch results while polling */ - activityExeBatchResultsFetchInterval?: number; + activityExeBatchResultPollIntervalSeconds?: number; /** Logger module */ logger?: Logger; /** Event Bus implements EventTarget */ @@ -51,8 +52,8 @@ export class Activity { * @hidden */ constructor( - public readonly id, - public readonly agreementId, + public readonly id: string, + public readonly agreementId: string, protected readonly yagnaApi: YagnaApi, protected readonly options: ActivityConfig, ) { @@ -88,6 +89,7 @@ export class Activity { public async execute(script: ExeScriptRequest, stream?: boolean, timeout?: number): Promise { let batchId: string, batchSize: number; let startTime = new Date(); + try { batchId = await this.send(script); startTime = new Date(); @@ -96,10 +98,13 @@ export class Activity { this.logger?.error(error?.response?.data?.message || error.message || error); throw new Error(error); } + this.logger?.debug(`Script sent. Batch ID: ${batchId}`); + this.options.eventTarget?.dispatchEvent( new Events.ScriptSent({ activityId: this.id, agreementId: this.agreementId }), ); + return stream ? this.streamingBatch(batchId, batchSize, startTime, timeout) : this.pollingBatch(batchId, startTime, timeout); @@ -160,41 +165,55 @@ export class Activity { this.logger?.debug(`Activity ${this.id} destroyed`); } - private async pollingBatch(batchId, startTime, timeout): Promise { + private async pollingBatch(batchId: string, startTime: Date, timeout?: number): Promise { + this.logger?.debug("Starting to poll for batch results"); let isBatchFinished = false; let lastIndex: number; let retryCount = 0; const maxRetries = 5; - const { id: activityId, agreementId } = this; + const { id: activityId, agreementId, logger } = this; const isRunning = () => this.isRunning; - const { activityExecuteTimeout, eventTarget } = this.options; + const { activityExecuteTimeout, eventTarget, activityExeBatchResultPollIntervalSeconds } = this.options; const api = this.yagnaApi.activity; const handleError = this.handleError.bind(this); + return new Readable({ objectMode: true, + async read() { while (!isBatchFinished) { + logger?.debug("Polling for batch script execution result"); + if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { + logger?.debug("Activity probably timed-out, will terminate"); return this.destroy(new Error(`Activity ${activityId} timeout.`)); } + if (!isRunning()) { + logger?.debug("Activity is no longer running, will terminate"); return this.destroy(new Error(`Activity ${activityId} has been interrupted.`)); } + try { + logger?.debug("Trying to get the results from the API"); // This will ignore "incompatibility" between ExeScriptCommandResultResultEnum and ResultState, which both - // contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamicaly + // contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamically // computed. const { data: rawExecBachResults } = await api.control.getExecBatchResults( activityId, batchId, undefined, - activityExecuteTimeout / 1000, + activityExeBatchResultPollIntervalSeconds, { timeout: 0, }, ); retryCount = 0; + const newResults = rawExecBachResults.map((rawResult) => new Result(rawResult)).slice(lastIndex + 1); + + logger?.debug(`Received the following results: ${JSON.stringify(newResults)}`); + if (Array.isArray(newResults) && newResults.length) { newResults.forEach((result) => { this.push(result); @@ -203,6 +222,8 @@ export class Activity { }); } } catch (error) { + logger?.error("Processing results received from the API failed"); + try { retryCount = await handleError(error, lastIndex, retryCount, maxRetries); } catch (error) { @@ -211,13 +232,20 @@ export class Activity { } } } + eventTarget?.dispatchEvent(new Events.ScriptExecuted({ activityId, agreementId, success: true })); + this.push(null); }, }); } - private async streamingBatch(batchId, batchSize, startTime, timeout): Promise { + private async streamingBatch( + batchId: string, + batchSize: number, + startTime: Date, + timeout?: number, + ): Promise { const basePath = this.yagnaApi.yagnaOptions.basePath; const apiKey = this.yagnaApi.yagnaOptions.apiKey; const eventSource = new EventSource(`${basePath}/activity/${this.id}/exec/${batchId}`, { @@ -263,7 +291,7 @@ export class Activity { }); } - private async handleError(error, cmdIndex, retryCount, maxRetries) { + private async handleError(error: Error | AxiosError, cmdIndex: number, retryCount: number, maxRetries: number) { if (this.isTimeoutError(error)) { this.logger?.warn("API request timeout." + error.toString()); return retryCount; @@ -276,9 +304,9 @@ export class Activity { } ++retryCount; const failMsg = "There was an error retrieving activity results."; - const errorMsg = error?.response?.data?.message || error?.message || error; + const errorMsg = this.getErrorMsg(error); if (retryCount < maxRetries) { - this.logger?.debug(`${failMsg} Retrying in ${this.options.activityExeBatchResultsFetchInterval}.`); + this.logger?.debug(`${failMsg} Retrying in ${this.options.activityExeBatchResultPollIntervalSeconds} seconds.`); return retryCount; } else { this.logger?.warn(`${failMsg} Giving up after ${retryCount} attempts. ${errorMsg}`); @@ -286,6 +314,16 @@ export class Activity { throw new Error(`Command #${cmdIndex || 0} getExecBatchResults error: ${errorMsg}`); } + private getErrorMsg(error: Error | AxiosError<{ message: string }>) { + if ("response" in error && error.response !== undefined) { + return error.response.data.message; + } else if ("message" in error) { + return error.message; + } else { + return error; + } + } + private isTimeoutError(error) { const timeoutMsg = error.message && error.message.includes("timeout"); return ( diff --git a/src/activity/config.ts b/src/activity/config.ts index f4d4ed97d..10875715f 100644 --- a/src/activity/config.ts +++ b/src/activity/config.ts @@ -4,7 +4,7 @@ import { Logger } from "../utils"; const DEFAULTS = { activityRequestTimeout: 10000, activityExecuteTimeout: 1000 * 60 * 5, // 5 min, - activityExeBatchResultsFetchInterval: 20000, + activityExeBatchResultPollIntervalSeconds: 5, }; /** @@ -13,15 +13,15 @@ const DEFAULTS = { export class ActivityConfig { public readonly activityRequestTimeout: number; public readonly activityExecuteTimeout: number; - public readonly activityExeBatchResultsFetchInterval: number; + public readonly activityExeBatchResultPollIntervalSeconds: number; public readonly logger?: Logger; public readonly eventTarget?: EventTarget; constructor(options?: ActivityOptions) { this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout; this.activityExecuteTimeout = options?.activityExecuteTimeout || DEFAULTS.activityExecuteTimeout; - this.activityExeBatchResultsFetchInterval = - options?.activityExeBatchResultsFetchInterval || DEFAULTS.activityExeBatchResultsFetchInterval; + this.activityExeBatchResultPollIntervalSeconds = + options?.activityExeBatchResultPollIntervalSeconds || DEFAULTS.activityExeBatchResultPollIntervalSeconds; this.logger = options?.logger; this.eventTarget = options?.eventTarget; } diff --git a/src/executor/executor.ts b/src/executor/executor.ts index f454ede24..792c33c4b 100644 --- a/src/executor/executor.ts +++ b/src/executor/executor.ts @@ -44,7 +44,7 @@ export type ExecutorOptions = { storageProvider?: StorageProvider; /** * @deprecated this parameter will be removed in the next version. - * Currently has no effect on executor termination. + * Currently, has no effect on executor termination. */ isSubprocess?: boolean; /** Timeout for preparing activity - creating and deploy commands */ diff --git a/src/script/script.ts b/src/script/script.ts index e3f9df954..1bca78375 100644 --- a/src/script/script.ts +++ b/src/script/script.ts @@ -6,14 +6,16 @@ import { Result } from "../activity"; * @hidden */ export class Script { + constructor(private commands: Command[] = []) {} + static create(commands?: Command[]): Script { return new Script(commands); } - constructor(private commands: Command[] = []) {} add(command: Command) { this.commands.push(command); } + async before() { await Promise.all(this.commands.map((cmd) => cmd.before())); } diff --git a/src/task/batch.ts b/src/task/batch.ts index 0bfde823f..2418f330c 100644 --- a/src/task/batch.ts +++ b/src/task/batch.ts @@ -1,8 +1,8 @@ import { DownloadFile, Run, Script, Transfer, UploadFile } from "../script"; import { Activity, Result } from "../activity"; import { StorageProvider } from "../storage/provider"; -import { Logger, sleep } from "../utils"; -import { Readable, Transform, pipeline } from "stream"; +import { Logger } from "../utils"; +import { pipeline, Readable, Transform } from "stream"; import { UploadData } from "../script/command"; export class Batch { @@ -72,40 +72,49 @@ export class Batch { /** * Executes the batch of commands added via {@link run} returning result for each of the steps. - * - * In case any of the commands will fail, the execution of the batch will be interrupted by the Provider. */ async end(): Promise { await this.script.before(); - await sleep(100, true); - let results: Readable; + try { - results = await this.activity.execute(this.script.getExeScriptRequest()); - } catch (error) { - // the original error is more important than the one from after() - await this.script.after([]).catch(); - throw error; - } - const allResults: Result[] = []; - return new Promise((resolve, reject) => { - results.on("data", (res) => { - allResults.push(res); - }); + const allResults: Result[] = []; + const script = this.script.getExeScriptRequest(); - results.on("end", () => { - this.script - .after(allResults) - .then((results) => resolve(results)) - .catch((error) => reject(error)); - }); + this.logger?.debug(`Sending exec script request to the exe-unit on provider: ${JSON.stringify(script)}`); + const results = await this.activity.execute(script); + + return new Promise((resolve, reject) => { + this.logger?.debug("Reading the results of the batch script"); + + results.on("data", (res) => { + this.logger?.debug(`Received data for batch script execution ${JSON.stringify(res)}`); + + allResults.push(res); + }); + + results.on("end", () => { + this.logger?.debug("End of batch script execution"); + this.script + .after(allResults) + .then((results) => resolve(results)) + .catch((error) => reject(error)); + }); - results.on("error", (error) => { - this.script - .after(allResults) - .then(() => reject(error)) - .catch(() => reject(error)); // Return original error, as it might be more important. + results.on("error", (error) => { + this.logger?.debug("Error in batch script execution"); + this.script + .after(allResults) + .then(() => reject(error)) + .catch(() => reject(error)); // Return original error, as it might be more important. + }); }); - }); + } catch (error) { + this.logger?.error(`Failed to send the exec script to the exe-unit on provider: ${error.toString()}`); + // NOTE: This is called only to ensure that each of the commands in the original script will be populated with at least `EmptyErrorResult`. + // That's actually a FIXME, as the command could start with an empty result, which eventually will get replaced with an actual one. + await this.script.after([]); + throw error; + } } async endStream(): Promise { @@ -116,7 +125,7 @@ export class Batch { results = await this.activity.execute(this.script.getExeScriptRequest()); } catch (error) { // the original error is more important than the one from after() - await script.after([]).catch(); + await script.after([]); throw error; } const decodedResults: Result[] = []; diff --git a/tests/unit/activity.test.ts b/tests/unit/activity.test.ts index 3551b66c1..3626e3f1b 100644 --- a/tests/unit/activity.test.ts +++ b/tests/unit/activity.test.ts @@ -252,7 +252,7 @@ describe("Activity", () => { it("should handle gsb error", async () => { const activity = await Activity.create("test_id", yagnaApi, { - activityExeBatchResultsFetchInterval: 10, + activityExeBatchResultPollIntervalSeconds: 10, }); const command1 = new Deploy(); const command2 = new Start(); From 8d9c20ea4d3adb1cd56ac755f3451789f1d03b40 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 16 Nov 2023 12:48:26 +0100 Subject: [PATCH 2/2] refactor: adjusted logs to be more developer friendly --- src/activity/activity.ts | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/activity/activity.ts b/src/activity/activity.ts index 1859b2df7..dad5fe170 100644 --- a/src/activity/activity.ts +++ b/src/activity/activity.ts @@ -185,17 +185,17 @@ export class Activity { logger?.debug("Polling for batch script execution result"); if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { - logger?.debug("Activity probably timed-out, will terminate"); + logger?.debug("Activity probably timed-out, will stop polling for batch execution results"); return this.destroy(new Error(`Activity ${activityId} timeout.`)); } if (!isRunning()) { - logger?.debug("Activity is no longer running, will terminate"); + logger?.debug("Activity is no longer running, will stop polling for batch execution results"); return this.destroy(new Error(`Activity ${activityId} has been interrupted.`)); } try { - logger?.debug("Trying to get the results from the API"); + logger?.debug("Trying to poll for batch execution results from yagna"); // This will ignore "incompatibility" between ExeScriptCommandResultResultEnum and ResultState, which both // contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamically // computed. @@ -212,7 +212,7 @@ export class Activity { const newResults = rawExecBachResults.map((rawResult) => new Result(rawResult)).slice(lastIndex + 1); - logger?.debug(`Received the following results: ${JSON.stringify(newResults)}`); + logger?.debug(`Received the following batch execution results: ${JSON.stringify(newResults)}`); if (Array.isArray(newResults) && newResults.length) { newResults.forEach((result) => { @@ -222,7 +222,7 @@ export class Activity { }); } } catch (error) { - logger?.error("Processing results received from the API failed"); + logger?.error(`Processing batch execution results failed due to ${error}`); try { retryCount = await handleError(error, lastIndex, retryCount, maxRetries); @@ -259,6 +259,7 @@ export class Activity { const activityId = this.id; const isRunning = () => this.isRunning; const activityExecuteTimeout = this.options.activityExecuteTimeout; + const { logger } = this; const errors: object[] = []; eventSource.addEventListener("error", (error) => errors.push(error.data.message ?? error)); @@ -271,14 +272,19 @@ export class Activity { async read() { while (!isBatchFinished) { if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { + logger?.debug("Activity probably timed-out, will stop streaming batch execution results"); return this.destroy(new Error(`Activity ${activityId} timeout.`)); } + if (!isRunning()) { + logger?.debug("Activity is no longer running, will stop streaming batch execution results"); return this.destroy(new Error(`Activity ${activityId} has been interrupted.`)); } + if (errors.length) { return this.destroy(new Error(`GetExecBatchResults failed due to errors: ${JSON.stringify(errors)}`)); } + if (results.length) { const result = results.shift(); this.push(result); @@ -286,6 +292,7 @@ export class Activity { } await sleep(500, true); } + this.push(null); }, }); @@ -296,21 +303,27 @@ export class Activity { this.logger?.warn("API request timeout." + error.toString()); return retryCount; } + const { terminated, reason, errorMessage } = await this.isTerminated(); + if (terminated) { const msg = (reason || "") + (errorMessage || ""); this.logger?.warn(`Activity ${this.id} terminated by provider. ${msg ? "Reason: " + msg : ""}`); throw error; } + ++retryCount; + const failMsg = "There was an error retrieving activity results."; const errorMsg = this.getErrorMsg(error); + if (retryCount < maxRetries) { this.logger?.debug(`${failMsg} Retrying in ${this.options.activityExeBatchResultPollIntervalSeconds} seconds.`); return retryCount; } else { this.logger?.warn(`${failMsg} Giving up after ${retryCount} attempts. ${errorMsg}`); } + throw new Error(`Command #${cmdIndex || 0} getExecBatchResults error: ${errorMsg}`); }