Skip to content

Commit

Permalink
chore: gftp fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mgordel committed Oct 19, 2023
1 parent 2588d47 commit de864ed
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 111 deletions.
8 changes: 1 addition & 7 deletions src/storage/gftp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ import { Logger, runtimeContextChecker } from "../utils";
import path from "path";
import fs from "fs";
import cp from "child_process";
import readline from "node:readline/promises";

export class GftpStorageProvider implements StorageProvider {
private gftpServerProcess?: cp.ChildProcess;
private readline?: readline.Interface;

/**
* All published URLs to be release on close().
Expand Down Expand Up @@ -53,9 +51,7 @@ export class GftpStorageProvider implements StorageProvider {

this.gftpServerProcess?.stdout?.setEncoding("utf-8");
this.gftpServerProcess?.stderr?.setEncoding("utf-8");
if (this.gftpServerProcess?.stdout && this.gftpServerProcess?.stdin) {
this.readline = readline.createInterface(this.gftpServerProcess.stdout, this.gftpServerProcess.stdin);
}
this.reader = this.gftpServerProcess?.stdout?.iterator();
});
}

Expand Down Expand Up @@ -114,13 +110,11 @@ export class GftpStorageProvider implements StorageProvider {

async close() {
await this.releaseAll();
this.readline?.close();
this.gftpServerProcess?.kill();
}

private async jsonrpc(method: string, params: object = {}) {
if (!this.isInitiated()) await this.init();
if (!this.reader) this.reader = this.gftpServerProcess?.stdout?.iterator();
const paramsStr = JSON.stringify(params);
const query = `{"jsonrpc": "2.0", "id": "1", "method": "${method}", "params": ${paramsStr}}\n`;
let valueStr = "";
Expand Down
76 changes: 40 additions & 36 deletions tests/e2e/blender.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,44 @@ const blenderParams = (frame) => ({
});

describe("Blender rendering", function () {
it("should render images by blender", async () => {
const executor = await TaskExecutor.create({
package: "golem/blender:latest",
logger,
});

executor.beforeEach(async (ctx) => {
const sourcePath = fs.realpathSync(__dirname + "/../mock/fixtures/cubes.blend");
await ctx.uploadFile(sourcePath, "/golem/resource/scene.blend");
});

const data = [0, 10, 20, 30, 40, 50];

const results = executor.map<number, string>(data, async (ctx, frame) => {
const result = await ctx
.beginBatch()
.uploadJson(blenderParams(frame), "/golem/work/params.json")
.run("/golem/entrypoints/run-blender.sh")
.downloadFile(`/golem/output/out${frame?.toString().padStart(4, "0")}.png`, `output_${frame}.png`)
.end()
.catch((error) => console.error(error.toString()));
return result ? `output_${frame}.png` : "";
});

const expectedResults = data.map((d) => `output_${d}.png`);

for await (const result of results) {
expect(expectedResults).toContain(result);
}

for (const file of expectedResults) {
expect(fs.existsSync(`${process.env.GOTH_GFTP_VOLUME || ""}${file}`)).toEqual(true);
}

await executor.end();
});
it(
"should render images by blender",
async () => {
const executor = await TaskExecutor.create({
package: "golem/blender:latest",
logger,
});

executor.beforeEach(async (ctx) => {
const sourcePath = fs.realpathSync(__dirname + "/../mock/fixtures/cubes.blend");
await ctx.uploadFile(sourcePath, "/golem/resource/scene.blend");
});

const data = [0, 10, 20, 30, 40, 50];

const results = executor.map<number, string>(data, async (ctx, frame) => {
const result = await ctx
.beginBatch()
.uploadJson(blenderParams(frame), "/golem/work/params.json")
.run("/golem/entrypoints/run-blender.sh")
.downloadFile(`/golem/output/out${frame?.toString().padStart(4, "0")}.png`, `output_${frame}.png`)
.end()
.catch((error) => console.error(error.toString()));
return result ? `output_${frame}.png` : "";
});

const expectedResults = data.map((d) => `output_${d}.png`);

for await (const result of results) {
expect(expectedResults).toContain(result);
}

for (const file of expectedResults) {
expect(fs.existsSync(`${process.env.GOTH_GFTP_VOLUME || ""}${file}`)).toEqual(true);
}

await executor.end();
},
1000 * 240,
);
});
70 changes: 37 additions & 33 deletions tests/e2e/express.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,40 +126,44 @@ describe("Express", function () {
return app;
};

it("starts the express server", async function () {
const app = getServer();
app.get("/network-status", (req, res) => {
res.json({ isInitialized: network.isInitialized() });
});
it(
"starts the express server",
async function () {
const app = getServer();
app.get("/network-status", (req, res) => {
res.json({ isInitialized: network.isInitialized() });
});

const response = await supertest(app).post("/render-scene").send({ frame: 0 });
expect(response.status).toEqual(200);
expect(response.body.jobId).toBeDefined();

const jobId = response.body.jobId;
let statusResponse = await supertest(app).get(`/job/${jobId}/status`);
expect(statusResponse.status).toEqual(200);
expect(statusResponse.body.status).toEqual(JobState.New);

await new Promise((resolve) => {
const interval = setInterval(async () => {
const statusResponse = await supertest(app).get(`/job/${jobId}/status`);
if (statusResponse.body.status === JobState.Done) {
clearInterval(interval);
resolve(undefined);
}
}, 500);
});
const response = await supertest(app).post("/render-scene").send({ frame: 0 });
expect(response.status).toEqual(200);
expect(response.body.jobId).toBeDefined();

const jobId = response.body.jobId;
let statusResponse = await supertest(app).get(`/job/${jobId}/status`);
expect(statusResponse.status).toEqual(200);
expect(statusResponse.body.status).toEqual(JobState.New);

await new Promise((resolve) => {
const interval = setInterval(async () => {
const statusResponse = await supertest(app).get(`/job/${jobId}/status`);
if (statusResponse.body.status === JobState.Done) {
clearInterval(interval);
resolve(undefined);
}
}, 500);
});

statusResponse = await supertest(app).get(`/job/${jobId}/status`);
expect(statusResponse.status).toEqual(200);
expect(statusResponse.body.status).toEqual(JobState.Done);
statusResponse = await supertest(app).get(`/job/${jobId}/status`);
expect(statusResponse.status).toEqual(200);
expect(statusResponse.body.status).toEqual(JobState.Done);

const resultResponse = await supertest(app).get(`/job/${jobId}/result`);
expect(resultResponse.status).toEqual(200);
expect(resultResponse.body).toEqual(
"Job completed successfully! See your result at http://localhost:3001/results/EXPRESS_SPEC_output_0.png",
);
expect(fs.existsSync(`EXPRESS_SPEC_output_0.png`)).toEqual(true);
});
const resultResponse = await supertest(app).get(`/job/${jobId}/result`);
expect(resultResponse.status).toEqual(200);
expect(resultResponse.body).toEqual(
"Job completed successfully! See your result at http://localhost:3001/results/EXPRESS_SPEC_output_0.png",
);
expect(fs.existsSync(`EXPRESS_SPEC_output_0.png`)).toEqual(true);
},
1000 * 240,
);
});
74 changes: 39 additions & 35 deletions tests/e2e/gftp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,43 @@ import fs from "fs";
const logger = new LoggerMock(false);

describe("GFTP transfers", function () {
it("should upload and download big files simultaneously", async () => {
const executor = await TaskExecutor.create({
package: "golem/alpine:latest",
logger,
});

executor.beforeEach(async (ctx) => {
const sourcePath = fs.realpathSync(__dirname + "/../mock/fixtures/eiffel.blend");
await ctx.uploadFile(sourcePath, "/golem/work/eiffel.blend");
});

const data = [0, 1, 2, 3, 4, 5];

const results = executor.map(data, async (ctx, frame) => {
const result = await ctx
.beginBatch()
.run("ls -Alh /golem/work/eiffel.blend")
.downloadFile(`/golem/work/eiffel.blend`, `copy_${frame}.blend`)
.end()
.catch((error) => ctx.rejectResult(error.toString()));
return result ? `copy_${frame}.blend` : "";
});

const expectedResults = data.map((d) => `copy_${d}.blend`);

for await (const result of results) {
expect(expectedResults).toContain(result);
}

for (const file of expectedResults) {
expect(fs.existsSync(file)).toEqual(true);
}

await executor.end();
});
it(
"should upload and download big files simultaneously",
async () => {
const executor = await TaskExecutor.create({
package: "golem/alpine:latest",
logger,
});

executor.beforeEach(async (ctx) => {
const sourcePath = fs.realpathSync(__dirname + "/../mock/fixtures/eiffel.blend");
await ctx.uploadFile(sourcePath, "/golem/work/eiffel.blend");
});

const data = [0, 1, 2, 3, 4, 5];

const results = executor.map(data, async (ctx, frame) => {
const result = await ctx
.beginBatch()
.run("ls -Alh /golem/work/eiffel.blend")
.downloadFile(`/golem/work/eiffel.blend`, `copy_${frame}.blend`)
.end()
.catch((error) => ctx.rejectResult(error.toString()));
return result ? `copy_${frame}.blend` : "";
});

const expectedResults = data.map((d) => `copy_${d}.blend`);

for await (const result of results) {
expect(expectedResults).toContain(result);
}

for (const file of expectedResults) {
expect(fs.existsSync(file)).toEqual(true);
}

await executor.end();
},
1000 * 240,
);
});

0 comments on commit de864ed

Please sign in to comment.