Skip to content

Commit

Permalink
fix(gftp): fixed bug with stream write after end
Browse files Browse the repository at this point in the history
JST-429: Fixed gftp bug with incorrect initialization
when there are no tasks in the executor.
Additionally, gftp has been refactored,
getting rid of the old @rauschma/stringio dependency
which has been replaced with native nodejs modules.
  • Loading branch information
mgordel committed Oct 5, 2023
1 parent 8dd67e1 commit d932cf1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 34 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"node": ">=16.0.0"
},
"dependencies": {
"@rauschma/stringio": "^1.4.0",
"axios": "^1.1.3",
"bottleneck": "^2.19.5",
"collect.js": "^4.34.3",
Expand Down
14 changes: 8 additions & 6 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,15 @@ export class TaskExecutor {

this.logger?.debug("Initializing task executor services...");
const allocations = await this.paymentService.createAllocation();
this.marketService.run(taskPackage, allocations).catch((e) => this.handleCriticalError(e));
this.agreementPoolService.run().catch((e) => this.handleCriticalError(e));
this.paymentService.run().catch((e) => this.handleCriticalError(e));
await Promise.all([
this.marketService.run(taskPackage, allocations),
this.agreementPoolService.run(),
this.paymentService.run(),
this.networkService?.run(),
this.statsService.run(),
this.storageProvider?.init(),
]).catch((e) => this.handleCriticalError(e));
this.taskService.run().catch((e) => this.handleCriticalError(e));
this.networkService?.run().catch((e) => this.handleCriticalError(e));
this.statsService.run().catch((e) => this.handleCriticalError(e));
this.storageProvider?.init().catch((e) => this.handleCriticalError(e));
if (runtimeContextChecker.isNode) this.handleCancelEvent();
this.options.eventTarget.dispatchEvent(new Events.ComputationStarted());
this.logger?.info(
Expand Down
40 changes: 13 additions & 27 deletions src/storage/gftp.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { StorageProvider, StorageProviderDataCallback } from "./provider";
import { StorageProvider } from "./provider";
import { Logger, runtimeContextChecker } from "../utils";
import path from "path";
import fs from "fs";
import { chomp, chunksToLinesAsync, streamEnd, streamWrite } from "@rauschma/stringio";
import cp from "child_process";
import readline from "node:readline/promises";

export class GftpStorageProvider implements StorageProvider {
private gftpServerProcess;
private reader;
private gftpServerProcess?: cp.ChildProcess;
private readline?: readline.Interface;

/**
* All published URLs to be release on close().
Expand Down Expand Up @@ -52,9 +52,9 @@ export class GftpStorageProvider implements StorageProvider {

this.gftpServerProcess?.stdout?.setEncoding("utf-8");
this.gftpServerProcess?.stderr?.setEncoding("utf-8");

this.gftpServerProcess.stdout.on("data", (data) => this.logger?.debug(`GFTP server stdout: ${data}`));
this.gftpServerProcess.stderr.on("data", (data) => this.logger?.error(`GFTP server stderr: ${data}`));
if (this.gftpServerProcess?.stdout && this.gftpServerProcess?.stdin) {
this.readline = readline.createInterface(this.gftpServerProcess.stdout, this.gftpServerProcess.stdin);
}
});
}

Expand All @@ -70,17 +70,12 @@ export class GftpStorageProvider implements StorageProvider {
return file_name;
}

private getGftpServerProcess() {
return this.gftpServerProcess;
}

async receiveFile(path: string): Promise<string> {
const { url } = await this.jsonrpc("receive", { output_file: path });
return url;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
receiveData(callback: StorageProviderDataCallback): Promise<string> {
receiveData(): Promise<string> {
throw new Error("receiveData is not implemented in GftpStorageProvider");
}

Expand All @@ -102,8 +97,7 @@ export class GftpStorageProvider implements StorageProvider {
return url;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
release(urls: string[]): Promise<void> {
release(): Promise<void> {
// NOTE: Due to GFTP's handling of file Ids (hashes), all files with same content will share IDs, so releasing
// one might break transfer of another one. Therefore, we release all files on close().
return Promise.resolve(undefined);
Expand All @@ -119,21 +113,19 @@ export class GftpStorageProvider implements StorageProvider {

async close() {
await this.releaseAll();
const stream = this.getGftpServerProcess();
if (stream) await streamEnd(this.getGftpServerProcess().stdin);
this.readline?.close();
this.gftpServerProcess?.kill();
}

private async jsonrpc(method: string, params: object = {}) {
if (!this.isInitiated()) await this.init();
if (!this.reader) this.reader = this.readStream(this.getGftpServerProcess().stdout);
const paramsStr = JSON.stringify(params);
const query = `{"jsonrpc": "2.0", "id": "1", "method": "${method}", "params": ${paramsStr}}\n`;
let valueStr = "";
await streamWrite(this.getGftpServerProcess().stdin, query);
try {
const { value } = await this.reader.next();
const value = await this.readline?.question(query);
if (!value) throw "Unable to get GFTP command result";
const { result } = JSON.parse(value as string);
const { result } = JSON.parse(value);
valueStr = value;
if (result === undefined) throw value;
return result;
Expand All @@ -144,12 +136,6 @@ export class GftpStorageProvider implements StorageProvider {
}
}

async *readStream(readable) {
for await (const line of chunksToLinesAsync(readable)) {
yield chomp(line);
}
}

private async uploadStream(stream: AsyncGenerator<Buffer>): Promise<string> {
// FIXME: temp file is never deleted.
const file_name = await this.generateTempFileName();
Expand Down

0 comments on commit d932cf1

Please sign in to comment.