diff --git a/src/activity/activity.ts b/src/activity/activity.ts index 1859b2df7..dad5fe170 100644 --- a/src/activity/activity.ts +++ b/src/activity/activity.ts @@ -185,17 +185,17 @@ export class Activity { logger?.debug("Polling for batch script execution result"); if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { - logger?.debug("Activity probably timed-out, will terminate"); + logger?.debug("Activity probably timed-out, will stop polling for batch execution results"); return this.destroy(new Error(`Activity ${activityId} timeout.`)); } if (!isRunning()) { - logger?.debug("Activity is no longer running, will terminate"); + logger?.debug("Activity is no longer running, will stop polling for batch execution results"); return this.destroy(new Error(`Activity ${activityId} has been interrupted.`)); } try { - logger?.debug("Trying to get the results from the API"); + logger?.debug("Trying to poll for batch execution results from yagna"); // This will ignore "incompatibility" between ExeScriptCommandResultResultEnum and ResultState, which both // contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamically // computed. @@ -212,7 +212,7 @@ export class Activity { const newResults = rawExecBachResults.map((rawResult) => new Result(rawResult)).slice(lastIndex + 1); - logger?.debug(`Received the following results: ${JSON.stringify(newResults)}`); + logger?.debug(`Received the following batch execution results: ${JSON.stringify(newResults)}`); if (Array.isArray(newResults) && newResults.length) { newResults.forEach((result) => { @@ -222,7 +222,7 @@ export class Activity { }); } } catch (error) { - logger?.error("Processing results received from the API failed"); + logger?.error(`Processing batch execution results failed due to ${error}`); try { retryCount = await handleError(error, lastIndex, retryCount, maxRetries); @@ -259,6 +259,7 @@ export class Activity { const activityId = this.id; const isRunning = () => this.isRunning; const activityExecuteTimeout = this.options.activityExecuteTimeout; + const { logger } = this; const errors: object[] = []; eventSource.addEventListener("error", (error) => errors.push(error.data.message ?? error)); @@ -271,14 +272,19 @@ export class Activity { async read() { while (!isBatchFinished) { if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) { + logger?.debug("Activity probably timed-out, will stop streaming batch execution results"); return this.destroy(new Error(`Activity ${activityId} timeout.`)); } + if (!isRunning()) { + logger?.debug("Activity is no longer running, will stop streaming batch execution results"); return this.destroy(new Error(`Activity ${activityId} has been interrupted.`)); } + if (errors.length) { return this.destroy(new Error(`GetExecBatchResults failed due to errors: ${JSON.stringify(errors)}`)); } + if (results.length) { const result = results.shift(); this.push(result); @@ -286,6 +292,7 @@ export class Activity { } await sleep(500, true); } + this.push(null); }, }); @@ -296,21 +303,27 @@ export class Activity { this.logger?.warn("API request timeout." + error.toString()); return retryCount; } + const { terminated, reason, errorMessage } = await this.isTerminated(); + if (terminated) { const msg = (reason || "") + (errorMessage || ""); this.logger?.warn(`Activity ${this.id} terminated by provider. ${msg ? "Reason: " + msg : ""}`); throw error; } + ++retryCount; + const failMsg = "There was an error retrieving activity results."; const errorMsg = this.getErrorMsg(error); + if (retryCount < maxRetries) { this.logger?.debug(`${failMsg} Retrying in ${this.options.activityExeBatchResultPollIntervalSeconds} seconds.`); return retryCount; } else { this.logger?.warn(`${failMsg} Giving up after ${retryCount} attempts. ${errorMsg}`); } + throw new Error(`Command #${cmdIndex || 0} getExecBatchResults error: ${errorMsg}`); }