Skip to content

Commit

Permalink
Merge pull request #977 from golemfactory/feature/JST-945/lease-proce…
Browse files Browse the repository at this point in the history
…ss-aborting

feat: added ability to cancel lease processes
  • Loading branch information
mgordel authored Jun 19, 2024
2 parents ecab235 + 9c60aae commit 80e8c04
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 278 deletions.
2 changes: 1 addition & 1 deletion src/activity/activity.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export class ActivityModuleImpl implements ActivityModule {
batchId: string,
commandIndex?: number | undefined,
): Observable<StreamingBatchEvent> {
this.logger.info("Observing streaming batch events", { activityId: activity.id, batchId });
this.logger.debug("Observing streaming batch events", { activityId: activity.id, batchId });
return this.activityApi.getExecBatchEvents(activity, batchId, commandIndex).pipe(
tap(async (event) => {
this.events.emit(
Expand Down
6 changes: 0 additions & 6 deletions src/activity/config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { ExecutionOptions } from "./exe-script-executor";

const DEFAULTS = {
activityRequestTimeout: 10000,
activityExecuteTimeout: 1000 * 60 * 5, // 5 min,
activityExeBatchResultPollIntervalSeconds: 5,
activityExeBatchResultMaxRetries: 20,
};
Expand All @@ -11,14 +9,10 @@ const DEFAULTS = {
* @internal
*/
export class ExecutionConfig {
public readonly activityRequestTimeout: number;
public readonly activityExecuteTimeout: number;
public readonly activityExeBatchResultPollIntervalSeconds: number;
public readonly activityExeBatchResultMaxRetries: number;

constructor(options?: ExecutionOptions) {
this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout;
this.activityExecuteTimeout = options?.activityExecuteTimeout || DEFAULTS.activityExecuteTimeout;
this.activityExeBatchResultMaxRetries =
options?.activityExeBatchResultMaxRetries || DEFAULTS.activityExeBatchResultMaxRetries;
this.activityExeBatchResultPollIntervalSeconds =
Expand Down
33 changes: 18 additions & 15 deletions src/activity/exe-script-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Capture, Deploy, DownloadFile, Run, Script, Start, Terminate, UploadFil
import { buildExeScriptSuccessResult } from "../../tests/unit/helpers";
import { GolemWorkError, WorkErrorCode } from "./work";
import { Logger, sleep } from "../shared/utils";
import { GolemError, GolemTimeoutError } from "../shared/error/golem-error";
import { GolemAbortError, GolemError } from "../shared/error/golem-error";
import { ExeScriptExecutor } from "./exe-script-executor";
import { StorageProvider } from "../shared/storage";
import { from, of, throwError } from "rxjs";
Expand Down Expand Up @@ -86,7 +86,6 @@ describe("ExeScriptExecutor", () => {
}

await script.after([]);
await executor.stop();
});

it("should execute script and get results by events", async () => {
Expand Down Expand Up @@ -131,7 +130,6 @@ describe("ExeScriptExecutor", () => {
});
results.on("end", async () => {
await script.after([]);
await executor.stop();
expect(resultCount).toEqual(6);
return res();
});
Expand Down Expand Up @@ -229,16 +227,17 @@ describe("ExeScriptExecutor", () => {
}
expect(expectedStdout).toEqual("test");
await script.after([]);
await executor.stop();
});
});

describe("Cancelling", () => {
it("should cancel executor", async () => {
const ac = new AbortController();
const executor = new ExeScriptExecutor(
instance(mockActivity),
instance(mockActivityModule),
instance(mockLogger),
{ signalOrTimeout: ac.signal },
);
const command1 = new Deploy();
const command2 = new Start();
Expand All @@ -249,10 +248,10 @@ describe("ExeScriptExecutor", () => {
const script = Script.create([command1, command2, command3, command4, command5, command6]);
await script.before();
const results = await executor.execute(script.getExeScriptRequest(), undefined, undefined);
await executor.stop();
ac.abort();
return new Promise<void>((res) => {
results.on("error", (error) => {
expect(error.toString()).toMatch(/Error: Activity .* has been interrupted/);
expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`));
return res();
});
results.on("data", () => null);
Expand All @@ -261,10 +260,14 @@ describe("ExeScriptExecutor", () => {

it("should cancel executor while streaming batch", async () => {
when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(of<StreamingBatchEvent>());
const ac = new AbortController();
const executor = new ExeScriptExecutor(
instance(mockActivity),
instance(mockActivityModule),
instance(mockLogger),
{
signalOrTimeout: ac.signal,
},
);
const command1 = new Deploy();
const command2 = new Start();
Expand All @@ -277,10 +280,10 @@ describe("ExeScriptExecutor", () => {
const script = Script.create([command1, command2, command3, command4]);
await script.before();
const results = await executor.execute(script.getExeScriptRequest(), true, undefined);
await executor.stop();
ac.abort();
return new Promise<void>((res) => {
results.on("error", (error) => {
expect(error.toString()).toMatch(/Error: Activity .* has been interrupted/);
expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`));
return res();
});
results.on("data", () => null);
Expand Down Expand Up @@ -386,7 +389,7 @@ describe("ExeScriptExecutor", () => {
.thenReject(error)
.thenResolve([testResult]);

const results = await executor.execute(script.getExeScriptRequest(), false, 1_000, 10);
const results = await executor.execute(script.getExeScriptRequest(), false, undefined, 10);

for await (const result of results) {
expect(result).toEqual(testResult);
Expand Down Expand Up @@ -446,23 +449,23 @@ describe("ExeScriptExecutor", () => {
await sleep(10, true);
return new Promise<void>((res) => {
results.on("error", (error: GolemWorkError) => {
expect(error).toBeInstanceOf(GolemTimeoutError);
expect(error.toString()).toMatch(/Error: Activity .* timeout/);
expect(error).toBeInstanceOf(GolemAbortError);
expect(error.toString()).toEqual("Error: Execution of script has been aborted");
return res();
});
// results.on("end", () => rej());
results.on("data", () => null);
});
});

it("should handle timeout error while streaming batch", async () => {
it("should handle abort error while streaming batch", async () => {
when(mockActivityModule.observeStreamingBatchEvents(anything(), anything())).thenReturn(of());
const executor = new ExeScriptExecutor(
instance(mockActivity),
instance(mockActivityModule),
instance(mockLogger),
{
activityExecuteTimeout: 1,
signalOrTimeout: 1,
},
);
const command1 = new Deploy();
Expand All @@ -478,8 +481,8 @@ describe("ExeScriptExecutor", () => {
const results = await executor.execute(script.getExeScriptRequest(), true, 800);
return new Promise<void>((res, rej) => {
results.on("error", (error: GolemError) => {
expect(error).toBeInstanceOf(GolemTimeoutError);
expect(error.toString()).toMatch(/Error: Activity .* timeout/);
expect(error).toBeInstanceOf(GolemAbortError);
expect(error.toString()).toEqual("Error: Execution of script has been aborted");
return res();
});
results.on("end", () => rej());
Expand Down
Loading

0 comments on commit 80e8c04

Please sign in to comment.