From bf67242ad1245a31213a0e17a79e75ffa7f1f003 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Mon, 6 Nov 2023 12:54:05 +0100 Subject: [PATCH] chore: refactored runAndStream to spawn and RemoteProcess --- examples/package.json | 2 +- examples/simple-usage/runStream.ts | 8 ----- examples/simple-usage/spawn.ts | 15 +++++++++ src/task/spawn.ts | 53 ++++++++++++++++++++++++++++++ src/task/work.spec.ts | 11 ------- src/task/work.ts | 36 ++++++++++---------- tests/e2e/tasks.spec.ts | 24 +++++++------- 7 files changed, 100 insertions(+), 49 deletions(-) delete mode 100644 examples/simple-usage/runStream.ts create mode 100644 examples/simple-usage/spawn.ts create mode 100644 src/task/spawn.ts diff --git a/examples/package.json b/examples/package.json index 3c0a01122..6e36547ee 100644 --- a/examples/package.json +++ b/examples/package.json @@ -7,7 +7,7 @@ "scripts": { "hello": "node ./hello-world/hello.mjs", "run": "ts-node ./simple-usage/run.ts", - "runStream": "ts-node ./simple-usage/runStream.ts", + "spawn": "ts-node ./simple-usage/spawn.ts", "map": "ts-node ./simple-usage/map.ts", "forEach": "ts-node ./simple-usage/forEach.ts", "fileTransfer": "ts-node ./simple-usage/fileTransfer.ts", diff --git a/examples/simple-usage/runStream.ts b/examples/simple-usage/runStream.ts deleted file mode 100644 index ee8ac3cd3..000000000 --- a/examples/simple-usage/runStream.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { TaskExecutor } from "@golem-sdk/golem-js"; - -const executor = await TaskExecutor.create("golem/alpine:latest"); -const streamOfResults = await executor.run(async (ctx) => ctx.runAndStream("while sleep 1; do date; done")); -streamOfResults?.on("data", (data) => console.log(data.stdout)); -streamOfResults?.on("error", () => executor.end()); - -setTimeout(() => executor.end(), 10_000); diff --git a/examples/simple-usage/spawn.ts b/examples/simple-usage/spawn.ts new file mode 100644 index 000000000..9aafc7329 --- /dev/null +++ b/examples/simple-usage/spawn.ts @@ -0,0 +1,15 @@ +import { TaskExecutor } from "@golem-sdk/golem-js"; + +const executor = await TaskExecutor.create("golem/alpine:latest"); +const finalResult = await executor.run(async (ctx) => { + const remoteProcess = await ctx.spawn("sleep 1 && echo 'Hello World' && echo 'Hello Golem'"); + remoteProcess.stdout.on("data", (data) => console.log("stdout>", data)); + remoteProcess.stderr.on("data", (data) => console.error("stderr>", data)); + + const finalResult = await remoteProcess.waitForExit(); + return finalResult; +}); + +console.log(finalResult); + +await executor.end(); diff --git a/src/task/spawn.ts b/src/task/spawn.ts new file mode 100644 index 000000000..1fd40035c --- /dev/null +++ b/src/task/spawn.ts @@ -0,0 +1,53 @@ +import { Readable, Transform } from "stream"; +import { Result } from "../activity"; + +export class RemoteProcess { + readonly stdout: Readable; + readonly stderr: Readable; + private lastResult?: Result; + private streamError?: Error; + private defaultTimeout = 20_000; + constructor(private streamOfActivityResults: Readable) { + this.streamOfActivityResults.on("data", (data) => (this.lastResult = data)); + this.streamOfActivityResults.on("error", (error) => (this.streamError = error)); + const { stdout, stderr } = this.transformResultsStream(); + this.stdout = stdout; + this.stderr = stderr; + } + + waitForExit(timeout?: number): Promise { + return new Promise((res, rej) => { + const timeoutId = setTimeout( + () => rej(new Error("The waiting time for the final result has been exceeded")), + timeout ?? this.defaultTimeout, + ); + this.streamOfActivityResults.on("close", () => { + clearTimeout(timeoutId); + if (this.lastResult) { + res(this.lastResult); + } else { + rej(new Error(`An error occurred while retrieving the results. ${this.streamError}`)); + } + }); + }); + } + + private transformResultsStream(): { stdout: Readable; stderr: Readable } { + const stdoutTransform = new Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + callback(null, chunk?.stdout ?? null); + }, + }); + const stderrTransform = new Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + callback(null, chunk?.stderr ?? null); + }, + }); + return { + stdout: this.streamOfActivityResults.pipe(stdoutTransform), + stderr: this.streamOfActivityResults.pipe(stderrTransform), + }; + } +} diff --git a/src/task/work.spec.ts b/src/task/work.spec.ts index 11c4a5491..362d07de3 100644 --- a/src/task/work.spec.ts +++ b/src/task/work.spec.ts @@ -46,17 +46,6 @@ describe("Work Context", () => { }); }); - describe("runAndStream()", () => { - it("should execute runAndStream command", async () => { - const expectedResult = ActivityMock.createResult({ stdout: "Ok" }); - activity.mockResults([expectedResult]); - const streamOfResults = await context.runAndStream("rm -rf"); - for await (const result of streamOfResults) { - expect(result).toBe(expectedResult); - } - }); - }); - describe("transfer()", () => { it("should execute transfer command", async () => { const result = ActivityMock.createResult({ stdout: "Ok" }); diff --git a/src/task/work.ts b/src/task/work.ts index a25f6d073..767a1c4b0 100644 --- a/src/task/work.ts +++ b/src/task/work.ts @@ -16,7 +16,7 @@ import { NullStorageProvider, StorageProvider } from "../storage"; import { Logger, sleep } from "../utils"; import { Batch } from "./batch"; import { NetworkNode } from "../network"; -import { Readable } from "stream"; +import { RemoteProcess } from "./spawn"; export type Worker = ( ctx: WorkContext, @@ -137,25 +137,24 @@ export class WorkContext { } /** - * Execute an executable on provider and return Promise of ReadableStream - * that streams Result objects containing the stdout and stderr of the command - * while it is being executed. + * Execute an executable on provider and return RemoteProcess object + * that contain stdout and stderr Readable * * @param commandLine Shell command to execute. * @param options Additional run options. */ - async runAndStream(commandLine: string, options?: Omit): Promise; + async spawn(commandLine: string, options?: Omit): Promise; /** * @param executable Executable to run. * @param args Executable arguments. * @param options Additional run options. */ - async runAndStream(executable: string, args: string[], options?: Omit): Promise; - async runAndStream( + async spawn(executable: string, args: string[], options?: CommandOptions): Promise; + async spawn( exeOrCmd: string, - argsOrOptions?: string[] | Omit, - options?: Omit, - ): Promise { + argsOrOptions?: string[] | CommandOptions, + options?: CommandOptions, + ): Promise { const isArray = Array.isArray(argsOrOptions); const capture: Capture = { stdout: { stream: { format: "string" } }, @@ -167,13 +166,16 @@ export class WorkContext { const script = new Script([run]); // In this case, the script consists only of the run command, // so we skip the execution of script.before and script.after - return this.activity.execute(script.getExeScriptRequest(), true, options?.timeout).catch((e) => { - throw new Error( - `Script execution failed for command: ${JSON.stringify(run.toJson())}. ${ - e?.response?.data?.message || e?.message || e - }`, - ); - }); + const streamOfActivityResults = await this.activity + .execute(script.getExeScriptRequest(), true, options?.timeout) + .catch((e) => { + throw new Error( + `Script execution failed for command: ${JSON.stringify(run.toJson())}. ${ + e?.response?.data?.message || e?.message || e + }`, + ); + }); + return new RemoteProcess(streamOfActivityResults); } /** diff --git a/tests/e2e/tasks.spec.ts b/tests/e2e/tasks.spec.ts index 4384f76ad..f8d94d64a 100644 --- a/tests/e2e/tasks.spec.ts +++ b/tests/e2e/tasks.spec.ts @@ -180,22 +180,22 @@ describe("Task Executor", function () { expect(["192.168.0.2", "192.168.0.3"]).toContain(result); }); - it("should run simple task and get results as stream", async () => { + it("should run simple by spawn command as external process", async () => { executor = await TaskExecutor.create({ package: "golem/alpine:latest", logger, }); - const results: Result[] = []; - await executor.run(async (ctx) => { - // for some reason we do not receive events for very simple commands, - // it is probably related to a bug where the command ends and the event does not have time to be triggered or handled - // after creating the EventSource connection to yagna... to investigate. - // for now, sleep 1 has been added, which solves the problem temporarily - const streamOfResults = await ctx.runAndStream("sleep 1 && echo 'Hello World'"); - for await (const result of streamOfResults) results.push(result); - }); - expect(results[0].stdout).toContain("Hello World"); - expect(results[0].result).toContain("Ok"); + let stdout = ""; + let stderr = ""; + const finalResult = await executor.run(async (ctx) => { + const remoteProcess = await ctx.spawn("sleep 1 && echo 'Hello World' && echo 'Hello Golem' >&2"); + remoteProcess.stdout.on("data", (data) => (stdout += data.trim())); + remoteProcess.stderr.on("data", (data) => (stderr += data.trim())); + return remoteProcess.waitForExit(); + }); + expect(stdout).toContain("Hello World"); + expect(stderr).toContain("Hello World"); + expect(finalResult?.result).toContain("Ok"); expect(logger.logs).toContain("Demand published on the market"); expect(logger.logs).toContain("New proposal has been received"); expect(logger.logs).toContain("Proposal has been responded");