Skip to content

Commit

Permalink
Merge pull request #660 from golemfactory/bugfix/JST-357/batch-timeou…
Browse files Browse the repository at this point in the history
…t-instead-of-error

fix: fixed issue with too long result polling interval leading to batch timeout
  • Loading branch information
grisha87 authored Nov 17, 2023
2 parents 8726925 + 8d9c20e commit 2240307
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 50 deletions.
3 changes: 2 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dist/
dist/
*.gvmi
75 changes: 63 additions & 12 deletions src/activity/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ActivityFactory } from "./factory";
import { ActivityConfig } from "./config";
import { Events } from "../events";
import { YagnaApi } from "../utils/yagna/yagna";
import { AxiosError } from "axios";

export enum ActivityStateEnum {
New = "New",
Expand All @@ -27,7 +28,7 @@ export interface ActivityOptions {
/** timeout for executing batch */
activityExecuteTimeout?: number;
/** interval for fetching batch results while polling */
activityExeBatchResultsFetchInterval?: number;
activityExeBatchResultPollIntervalSeconds?: number;
/** Logger module */
logger?: Logger;
/** Event Bus implements EventTarget */
Expand All @@ -51,8 +52,8 @@ export class Activity {
* @hidden
*/
constructor(
public readonly id,
public readonly agreementId,
public readonly id: string,
public readonly agreementId: string,
protected readonly yagnaApi: YagnaApi,
protected readonly options: ActivityConfig,
) {
Expand Down Expand Up @@ -88,6 +89,7 @@ export class Activity {
public async execute(script: ExeScriptRequest, stream?: boolean, timeout?: number): Promise<Readable> {
let batchId: string, batchSize: number;
let startTime = new Date();

try {
batchId = await this.send(script);
startTime = new Date();
Expand All @@ -96,10 +98,13 @@ export class Activity {
this.logger?.error(error?.response?.data?.message || error.message || error);
throw new Error(error);
}

this.logger?.debug(`Script sent. Batch ID: ${batchId}`);

this.options.eventTarget?.dispatchEvent(
new Events.ScriptSent({ activityId: this.id, agreementId: this.agreementId }),
);

return stream
? this.streamingBatch(batchId, batchSize, startTime, timeout)
: this.pollingBatch(batchId, startTime, timeout);
Expand Down Expand Up @@ -160,41 +165,55 @@ export class Activity {
this.logger?.debug(`Activity ${this.id} destroyed`);
}

private async pollingBatch(batchId, startTime, timeout): Promise<Readable> {
private async pollingBatch(batchId: string, startTime: Date, timeout?: number): Promise<Readable> {
this.logger?.debug("Starting to poll for batch results");
let isBatchFinished = false;
let lastIndex: number;
let retryCount = 0;
const maxRetries = 5;
const { id: activityId, agreementId } = this;
const { id: activityId, agreementId, logger } = this;
const isRunning = () => this.isRunning;
const { activityExecuteTimeout, eventTarget } = this.options;
const { activityExecuteTimeout, eventTarget, activityExeBatchResultPollIntervalSeconds } = this.options;
const api = this.yagnaApi.activity;
const handleError = this.handleError.bind(this);

return new Readable({
objectMode: true,

async read() {
while (!isBatchFinished) {
logger?.debug("Polling for batch script execution result");

if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) {
logger?.debug("Activity probably timed-out, will stop polling for batch execution results");
return this.destroy(new Error(`Activity ${activityId} timeout.`));
}

if (!isRunning()) {
logger?.debug("Activity is no longer running, will stop polling for batch execution results");
return this.destroy(new Error(`Activity ${activityId} has been interrupted.`));
}

try {
logger?.debug("Trying to poll for batch execution results from yagna");
// This will ignore "incompatibility" between ExeScriptCommandResultResultEnum and ResultState, which both
// contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamicaly
// contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamically
// computed.
const { data: rawExecBachResults } = await api.control.getExecBatchResults(
activityId,
batchId,
undefined,
activityExecuteTimeout / 1000,
activityExeBatchResultPollIntervalSeconds,
{
timeout: 0,
},
);
retryCount = 0;

const newResults = rawExecBachResults.map((rawResult) => new Result(rawResult)).slice(lastIndex + 1);

logger?.debug(`Received the following batch execution results: ${JSON.stringify(newResults)}`);

if (Array.isArray(newResults) && newResults.length) {
newResults.forEach((result) => {
this.push(result);
Expand All @@ -203,6 +222,8 @@ export class Activity {
});
}
} catch (error) {
logger?.error(`Processing batch execution results failed due to ${error}`);

try {
retryCount = await handleError(error, lastIndex, retryCount, maxRetries);
} catch (error) {
Expand All @@ -211,13 +232,20 @@ export class Activity {
}
}
}

eventTarget?.dispatchEvent(new Events.ScriptExecuted({ activityId, agreementId, success: true }));

this.push(null);
},
});
}

private async streamingBatch(batchId, batchSize, startTime, timeout): Promise<Readable> {
private async streamingBatch(
batchId: string,
batchSize: number,
startTime: Date,
timeout?: number,
): Promise<Readable> {
const basePath = this.yagnaApi.yagnaOptions.basePath;
const apiKey = this.yagnaApi.yagnaOptions.apiKey;
const eventSource = new EventSource(`${basePath}/activity/${this.id}/exec/${batchId}`, {
Expand All @@ -231,6 +259,7 @@ export class Activity {
const activityId = this.id;
const isRunning = () => this.isRunning;
const activityExecuteTimeout = this.options.activityExecuteTimeout;
const { logger } = this;

const errors: object[] = [];
eventSource.addEventListener("error", (error) => errors.push(error.data.message ?? error));
Expand All @@ -243,49 +272,71 @@ export class Activity {
async read() {
while (!isBatchFinished) {
if (startTime.valueOf() + (timeout || activityExecuteTimeout) <= new Date().valueOf()) {
logger?.debug("Activity probably timed-out, will stop streaming batch execution results");
return this.destroy(new Error(`Activity ${activityId} timeout.`));
}

if (!isRunning()) {
logger?.debug("Activity is no longer running, will stop streaming batch execution results");
return this.destroy(new Error(`Activity ${activityId} has been interrupted.`));
}

if (errors.length) {
return this.destroy(new Error(`GetExecBatchResults failed due to errors: ${JSON.stringify(errors)}`));
}

if (results.length) {
const result = results.shift();
this.push(result);
isBatchFinished = result?.isBatchFinished || false;
}
await sleep(500, true);
}

this.push(null);
},
});
}

private async handleError(error, cmdIndex, retryCount, maxRetries) {
private async handleError(error: Error | AxiosError, cmdIndex: number, retryCount: number, maxRetries: number) {
if (this.isTimeoutError(error)) {
this.logger?.warn("API request timeout." + error.toString());
return retryCount;
}

const { terminated, reason, errorMessage } = await this.isTerminated();

if (terminated) {
const msg = (reason || "") + (errorMessage || "");
this.logger?.warn(`Activity ${this.id} terminated by provider. ${msg ? "Reason: " + msg : ""}`);
throw error;
}

++retryCount;

const failMsg = "There was an error retrieving activity results.";
const errorMsg = error?.response?.data?.message || error?.message || error;
const errorMsg = this.getErrorMsg(error);

if (retryCount < maxRetries) {
this.logger?.debug(`${failMsg} Retrying in ${this.options.activityExeBatchResultsFetchInterval}.`);
this.logger?.debug(`${failMsg} Retrying in ${this.options.activityExeBatchResultPollIntervalSeconds} seconds.`);
return retryCount;
} else {
this.logger?.warn(`${failMsg} Giving up after ${retryCount} attempts. ${errorMsg}`);
}

throw new Error(`Command #${cmdIndex || 0} getExecBatchResults error: ${errorMsg}`);
}

private getErrorMsg(error: Error | AxiosError<{ message: string }>) {
if ("response" in error && error.response !== undefined) {
return error.response.data.message;
} else if ("message" in error) {
return error.message;
} else {
return error;
}
}

private isTimeoutError(error) {
const timeoutMsg = error.message && error.message.includes("timeout");
return (
Expand Down
8 changes: 4 additions & 4 deletions src/activity/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Logger } from "../utils";
const DEFAULTS = {
activityRequestTimeout: 10000,
activityExecuteTimeout: 1000 * 60 * 5, // 5 min,
activityExeBatchResultsFetchInterval: 20000,
activityExeBatchResultPollIntervalSeconds: 5,
};

/**
Expand All @@ -13,15 +13,15 @@ const DEFAULTS = {
export class ActivityConfig {
public readonly activityRequestTimeout: number;
public readonly activityExecuteTimeout: number;
public readonly activityExeBatchResultsFetchInterval: number;
public readonly activityExeBatchResultPollIntervalSeconds: number;
public readonly logger?: Logger;
public readonly eventTarget?: EventTarget;

constructor(options?: ActivityOptions) {
this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout;
this.activityExecuteTimeout = options?.activityExecuteTimeout || DEFAULTS.activityExecuteTimeout;
this.activityExeBatchResultsFetchInterval =
options?.activityExeBatchResultsFetchInterval || DEFAULTS.activityExeBatchResultsFetchInterval;
this.activityExeBatchResultPollIntervalSeconds =
options?.activityExeBatchResultPollIntervalSeconds || DEFAULTS.activityExeBatchResultPollIntervalSeconds;
this.logger = options?.logger;
this.eventTarget = options?.eventTarget;
}
Expand Down
2 changes: 1 addition & 1 deletion src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export type ExecutorOptions = {
storageProvider?: StorageProvider;
/**
* @deprecated this parameter will be removed in the next version.
* Currently has no effect on executor termination.
* Currently, has no effect on executor termination.
*/
isSubprocess?: boolean;
/** Timeout for preparing activity - creating and deploy commands */
Expand Down
4 changes: 3 additions & 1 deletion src/script/script.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import { Result } from "../activity";
* @hidden
*/
export class Script {
constructor(private commands: Command[] = []) {}

static create(commands?: Command[]): Script {
return new Script(commands);
}

constructor(private commands: Command[] = []) {}
add(command: Command) {
this.commands.push(command);
}

async before() {
await Promise.all(this.commands.map((cmd) => cmd.before()));
}
Expand Down
69 changes: 39 additions & 30 deletions src/task/batch.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { DownloadFile, Run, Script, Transfer, UploadFile } from "../script";
import { Activity, Result } from "../activity";
import { StorageProvider } from "../storage/provider";
import { Logger, sleep } from "../utils";
import { Readable, Transform, pipeline } from "stream";
import { Logger } from "../utils";
import { pipeline, Readable, Transform } from "stream";
import { UploadData } from "../script/command";

export class Batch {
Expand Down Expand Up @@ -72,40 +72,49 @@ export class Batch {

/**
* Executes the batch of commands added via {@link run} returning result for each of the steps.
*
* In case any of the commands will fail, the execution of the batch will be interrupted by the Provider.
*/
async end(): Promise<Result[]> {
await this.script.before();
await sleep(100, true);
let results: Readable;

try {
results = await this.activity.execute(this.script.getExeScriptRequest());
} catch (error) {
// the original error is more important than the one from after()
await this.script.after([]).catch();
throw error;
}
const allResults: Result[] = [];
return new Promise((resolve, reject) => {
results.on("data", (res) => {
allResults.push(res);
});
const allResults: Result[] = [];
const script = this.script.getExeScriptRequest();

results.on("end", () => {
this.script
.after(allResults)
.then((results) => resolve(results))
.catch((error) => reject(error));
});
this.logger?.debug(`Sending exec script request to the exe-unit on provider: ${JSON.stringify(script)}`);
const results = await this.activity.execute(script);

return new Promise((resolve, reject) => {
this.logger?.debug("Reading the results of the batch script");

results.on("data", (res) => {
this.logger?.debug(`Received data for batch script execution ${JSON.stringify(res)}`);

allResults.push(res);
});

results.on("end", () => {
this.logger?.debug("End of batch script execution");
this.script
.after(allResults)
.then((results) => resolve(results))
.catch((error) => reject(error));
});

results.on("error", (error) => {
this.script
.after(allResults)
.then(() => reject(error))
.catch(() => reject(error)); // Return original error, as it might be more important.
results.on("error", (error) => {
this.logger?.debug("Error in batch script execution");
this.script
.after(allResults)
.then(() => reject(error))
.catch(() => reject(error)); // Return original error, as it might be more important.
});
});
});
} catch (error) {
this.logger?.error(`Failed to send the exec script to the exe-unit on provider: ${error.toString()}`);
// NOTE: This is called only to ensure that each of the commands in the original script will be populated with at least `EmptyErrorResult`.
// That's actually a FIXME, as the command could start with an empty result, which eventually will get replaced with an actual one.
await this.script.after([]);
throw error;
}
}

async endStream(): Promise<Readable> {
Expand All @@ -116,7 +125,7 @@ export class Batch {
results = await this.activity.execute(this.script.getExeScriptRequest());
} catch (error) {
// the original error is more important than the one from after()
await script.after([]).catch();
await script.after([]);
throw error;
}
const decodedResults: Result[] = [];
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/activity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ describe("Activity", () => {

it("should handle gsb error", async () => {
const activity = await Activity.create("test_id", yagnaApi, {
activityExeBatchResultsFetchInterval: 10,
activityExeBatchResultPollIntervalSeconds: 10,
});
const command1 = new Deploy();
const command2 = new Start();
Expand Down

0 comments on commit 2240307

Please sign in to comment.