diff --git a/examples/simple-usage/spawn.ts b/examples/simple-usage/spawn.ts index 9aafc7329..145439fbf 100644 --- a/examples/simple-usage/spawn.ts +++ b/examples/simple-usage/spawn.ts @@ -2,7 +2,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; const executor = await TaskExecutor.create("golem/alpine:latest"); const finalResult = await executor.run(async (ctx) => { - const remoteProcess = await ctx.spawn("sleep 1 && echo 'Hello World' && echo 'Hello Golem'"); + const remoteProcess = await ctx.spawn("sleep 1 && echo 'Hello World' && echo 'Hello Golem' >&2"); remoteProcess.stdout.on("data", (data) => console.log("stdout>", data)); remoteProcess.stderr.on("data", (data) => console.error("stderr>", data)); diff --git a/src/task/spawn.ts b/src/task/spawn.ts index 1fd40035c..3c8177a93 100644 --- a/src/task/spawn.ts +++ b/src/task/spawn.ts @@ -8,7 +8,9 @@ export class RemoteProcess { private streamError?: Error; private defaultTimeout = 20_000; constructor(private streamOfActivityResults: Readable) { - this.streamOfActivityResults.on("data", (data) => (this.lastResult = data)); + this.streamOfActivityResults.on("data", (data) => { + this.lastResult = data; + }); this.streamOfActivityResults.on("error", (error) => (this.streamError = error)); const { stdout, stderr } = this.transformResultsStream(); this.stdout = stdout; @@ -21,14 +23,16 @@ export class RemoteProcess { () => rej(new Error("The waiting time for the final result has been exceeded")), timeout ?? this.defaultTimeout, ); - this.streamOfActivityResults.on("close", () => { + const end = () => { clearTimeout(timeoutId); if (this.lastResult) { res(this.lastResult); } else { rej(new Error(`An error occurred while retrieving the results. ${this.streamError}`)); } - }); + }; + if (this.streamOfActivityResults.closed) return end(); + this.streamOfActivityResults.on("close", () => end()); }); } @@ -36,13 +40,13 @@ export class RemoteProcess { const stdoutTransform = new Transform({ objectMode: true, transform(chunk, encoding, callback) { - callback(null, chunk?.stdout ?? null); + callback(null, chunk?.stdout); }, }); const stderrTransform = new Transform({ objectMode: true, transform(chunk, encoding, callback) { - callback(null, chunk?.stderr ?? null); + callback(null, chunk?.stderr); }, }); return { diff --git a/src/task/work.spec.ts b/src/task/work.spec.ts index 362d07de3..2d361740f 100644 --- a/src/task/work.spec.ts +++ b/src/task/work.spec.ts @@ -46,6 +46,22 @@ describe("Work Context", () => { }); }); + describe("spawn()", () => { + it("should execute spawn command", async () => { + const expectedResult = ActivityMock.createResult({ stdout: "Ok", stderr: "Error", isBatchFinished: true }); + activity.mockResults([expectedResult]); + const remoteProcess = await context.spawn("rm -rf"); + for await (const result of remoteProcess.stdout) { + expect(result).toBe("Ok"); + } + for await (const result of remoteProcess.stderr) { + expect(result).toBe("Error"); + } + const finalResult = await remoteProcess.waitForExit(); + expect(finalResult.result).toBe("Ok"); + }); + }); + describe("transfer()", () => { it("should execute transfer command", async () => { const result = ActivityMock.createResult({ stdout: "Ok" });