Skip to content

Commit

Permalink
chore: refactored runAndStream to spawn and RemoteProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
mgordel committed Nov 6, 2023
1 parent 7f6dacb commit bf67242
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 49 deletions.
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"scripts": {
"hello": "node ./hello-world/hello.mjs",
"run": "ts-node ./simple-usage/run.ts",
"runStream": "ts-node ./simple-usage/runStream.ts",
"spawn": "ts-node ./simple-usage/spawn.ts",
"map": "ts-node ./simple-usage/map.ts",
"forEach": "ts-node ./simple-usage/forEach.ts",
"fileTransfer": "ts-node ./simple-usage/fileTransfer.ts",
Expand Down
8 changes: 0 additions & 8 deletions examples/simple-usage/runStream.ts

This file was deleted.

15 changes: 15 additions & 0 deletions examples/simple-usage/spawn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { TaskExecutor } from "@golem-sdk/golem-js";

const executor = await TaskExecutor.create("golem/alpine:latest");
const finalResult = await executor.run(async (ctx) => {
const remoteProcess = await ctx.spawn("sleep 1 && echo 'Hello World' && echo 'Hello Golem'");
remoteProcess.stdout.on("data", (data) => console.log("stdout>", data));
remoteProcess.stderr.on("data", (data) => console.error("stderr>", data));

const finalResult = await remoteProcess.waitForExit();
return finalResult;
});

console.log(finalResult);

await executor.end();
53 changes: 53 additions & 0 deletions src/task/spawn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Readable, Transform } from "stream";
import { Result } from "../activity";

export class RemoteProcess {
readonly stdout: Readable;
readonly stderr: Readable;
private lastResult?: Result;
private streamError?: Error;
private defaultTimeout = 20_000;
constructor(private streamOfActivityResults: Readable) {
this.streamOfActivityResults.on("data", (data) => (this.lastResult = data));
this.streamOfActivityResults.on("error", (error) => (this.streamError = error));
const { stdout, stderr } = this.transformResultsStream();
this.stdout = stdout;
this.stderr = stderr;
}

waitForExit(timeout?: number): Promise<Result> {
return new Promise((res, rej) => {
const timeoutId = setTimeout(
() => rej(new Error("The waiting time for the final result has been exceeded")),
timeout ?? this.defaultTimeout,
);
this.streamOfActivityResults.on("close", () => {
clearTimeout(timeoutId);
if (this.lastResult) {
res(this.lastResult);
} else {
rej(new Error(`An error occurred while retrieving the results. ${this.streamError}`));
}
});
});
}

private transformResultsStream(): { stdout: Readable; stderr: Readable } {
const stdoutTransform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
callback(null, chunk?.stdout ?? null);
},
});
const stderrTransform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
callback(null, chunk?.stderr ?? null);
},
});
return {
stdout: this.streamOfActivityResults.pipe(stdoutTransform),
stderr: this.streamOfActivityResults.pipe(stderrTransform),
};
}
}
11 changes: 0 additions & 11 deletions src/task/work.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@ describe("Work Context", () => {
});
});

describe("runAndStream()", () => {
it("should execute runAndStream command", async () => {
const expectedResult = ActivityMock.createResult({ stdout: "Ok" });
activity.mockResults([expectedResult]);
const streamOfResults = await context.runAndStream("rm -rf");
for await (const result of streamOfResults) {
expect(result).toBe(expectedResult);
}
});
});

describe("transfer()", () => {
it("should execute transfer command", async () => {
const result = ActivityMock.createResult({ stdout: "Ok" });
Expand Down
36 changes: 19 additions & 17 deletions src/task/work.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { NullStorageProvider, StorageProvider } from "../storage";
import { Logger, sleep } from "../utils";
import { Batch } from "./batch";
import { NetworkNode } from "../network";
import { Readable } from "stream";
import { RemoteProcess } from "./spawn";

export type Worker<InputType = unknown, OutputType = unknown> = (
ctx: WorkContext,
Expand Down Expand Up @@ -137,25 +137,24 @@ export class WorkContext {
}

/**
* Execute an executable on provider and return Promise of ReadableStream
* that streams Result objects containing the stdout and stderr of the command
* while it is being executed.
* Execute an executable on provider and return RemoteProcess object
* that contain stdout and stderr Readable
*
* @param commandLine Shell command to execute.
* @param options Additional run options.
*/
async runAndStream(commandLine: string, options?: Omit<CommandOptions, "capture">): Promise<Readable>;
async spawn(commandLine: string, options?: Omit<CommandOptions, "capture">): Promise<RemoteProcess>;
/**
* @param executable Executable to run.
* @param args Executable arguments.
* @param options Additional run options.
*/
async runAndStream(executable: string, args: string[], options?: Omit<CommandOptions, "capture">): Promise<Readable>;
async runAndStream(
async spawn(executable: string, args: string[], options?: CommandOptions): Promise<RemoteProcess>;
async spawn(
exeOrCmd: string,
argsOrOptions?: string[] | Omit<CommandOptions, "capture">,
options?: Omit<CommandOptions, "capture">,
): Promise<Readable> {
argsOrOptions?: string[] | CommandOptions,
options?: CommandOptions,
): Promise<RemoteProcess> {
const isArray = Array.isArray(argsOrOptions);
const capture: Capture = {
stdout: { stream: { format: "string" } },
Expand All @@ -167,13 +166,16 @@ export class WorkContext {
const script = new Script([run]);
// In this case, the script consists only of the run command,
// so we skip the execution of script.before and script.after
return this.activity.execute(script.getExeScriptRequest(), true, options?.timeout).catch((e) => {
throw new Error(
`Script execution failed for command: ${JSON.stringify(run.toJson())}. ${
e?.response?.data?.message || e?.message || e
}`,
);
});
const streamOfActivityResults = await this.activity
.execute(script.getExeScriptRequest(), true, options?.timeout)
.catch((e) => {
throw new Error(
`Script execution failed for command: ${JSON.stringify(run.toJson())}. ${
e?.response?.data?.message || e?.message || e
}`,
);
});
return new RemoteProcess(streamOfActivityResults);
}

/**
Expand Down
24 changes: 12 additions & 12 deletions tests/e2e/tasks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,22 @@ describe("Task Executor", function () {
expect(["192.168.0.2", "192.168.0.3"]).toContain(result);
});

it("should run simple task and get results as stream", async () => {
it("should run simple by spawn command as external process", async () => {
executor = await TaskExecutor.create({
package: "golem/alpine:latest",
logger,
});
const results: Result[] = [];
await executor.run(async (ctx) => {
// for some reason we do not receive events for very simple commands,
// it is probably related to a bug where the command ends and the event does not have time to be triggered or handled
// after creating the EventSource connection to yagna... to investigate.
// for now, sleep 1 has been added, which solves the problem temporarily
const streamOfResults = await ctx.runAndStream("sleep 1 && echo 'Hello World'");
for await (const result of streamOfResults) results.push(result);
});
expect(results[0].stdout).toContain("Hello World");
expect(results[0].result).toContain("Ok");
let stdout = "";
let stderr = "";
const finalResult = await executor.run(async (ctx) => {
const remoteProcess = await ctx.spawn("sleep 1 && echo 'Hello World' && echo 'Hello Golem' >&2");
remoteProcess.stdout.on("data", (data) => (stdout += data.trim()));
remoteProcess.stderr.on("data", (data) => (stderr += data.trim()));
return remoteProcess.waitForExit();
});
expect(stdout).toContain("Hello World");
expect(stderr).toContain("Hello World");
expect(finalResult?.result).toContain("Ok");
expect(logger.logs).toContain("Demand published on the market");
expect(logger.logs).toContain("New proposal has been received");
expect(logger.logs).toContain("Proposal has been responded");
Expand Down

0 comments on commit bf67242

Please sign in to comment.