From d932cf12edd3f827b64a8134849a617d232b6f7c Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Thu, 5 Oct 2023 16:15:45 +0200 Subject: [PATCH] fix(gftp): fixed bug with stream write after end JST-429: Fixed gftp bug with incorrect initialization when there are no tasks in the executor. Additionally, gftp has been refactored, getting rid of the old @rauschma/stringio dependency which has been replaced with native nodejs modules. --- package.json | 1 - src/executor/executor.ts | 14 ++++++++------ src/storage/gftp.ts | 40 +++++++++++++--------------------------- 3 files changed, 21 insertions(+), 34 deletions(-) diff --git a/package.json b/package.json index f47fd040d..72a639ded 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,6 @@ "node": ">=16.0.0" }, "dependencies": { - "@rauschma/stringio": "^1.4.0", "axios": "^1.1.3", "bottleneck": "^2.19.5", "collect.js": "^4.34.3", diff --git a/src/executor/executor.ts b/src/executor/executor.ts index eeb2eb637..456403855 100644 --- a/src/executor/executor.ts +++ b/src/executor/executor.ts @@ -205,13 +205,15 @@ export class TaskExecutor { this.logger?.debug("Initializing task executor services..."); const allocations = await this.paymentService.createAllocation(); - this.marketService.run(taskPackage, allocations).catch((e) => this.handleCriticalError(e)); - this.agreementPoolService.run().catch((e) => this.handleCriticalError(e)); - this.paymentService.run().catch((e) => this.handleCriticalError(e)); + await Promise.all([ + this.marketService.run(taskPackage, allocations), + this.agreementPoolService.run(), + this.paymentService.run(), + this.networkService?.run(), + this.statsService.run(), + this.storageProvider?.init(), + ]).catch((e) => this.handleCriticalError(e)); this.taskService.run().catch((e) => this.handleCriticalError(e)); - this.networkService?.run().catch((e) => this.handleCriticalError(e)); - this.statsService.run().catch((e) => this.handleCriticalError(e)); - this.storageProvider?.init().catch((e) => this.handleCriticalError(e)); if (runtimeContextChecker.isNode) this.handleCancelEvent(); this.options.eventTarget.dispatchEvent(new Events.ComputationStarted()); this.logger?.info( diff --git a/src/storage/gftp.ts b/src/storage/gftp.ts index 099d62246..2c54bde93 100644 --- a/src/storage/gftp.ts +++ b/src/storage/gftp.ts @@ -1,13 +1,13 @@ -import { StorageProvider, StorageProviderDataCallback } from "./provider"; +import { StorageProvider } from "./provider"; import { Logger, runtimeContextChecker } from "../utils"; import path from "path"; import fs from "fs"; -import { chomp, chunksToLinesAsync, streamEnd, streamWrite } from "@rauschma/stringio"; import cp from "child_process"; +import readline from "node:readline/promises"; export class GftpStorageProvider implements StorageProvider { - private gftpServerProcess; - private reader; + private gftpServerProcess?: cp.ChildProcess; + private readline?: readline.Interface; /** * All published URLs to be release on close(). @@ -52,9 +52,9 @@ export class GftpStorageProvider implements StorageProvider { this.gftpServerProcess?.stdout?.setEncoding("utf-8"); this.gftpServerProcess?.stderr?.setEncoding("utf-8"); - - this.gftpServerProcess.stdout.on("data", (data) => this.logger?.debug(`GFTP server stdout: ${data}`)); - this.gftpServerProcess.stderr.on("data", (data) => this.logger?.error(`GFTP server stderr: ${data}`)); + if (this.gftpServerProcess?.stdout && this.gftpServerProcess?.stdin) { + this.readline = readline.createInterface(this.gftpServerProcess.stdout, this.gftpServerProcess.stdin); + } }); } @@ -70,17 +70,12 @@ export class GftpStorageProvider implements StorageProvider { return file_name; } - private getGftpServerProcess() { - return this.gftpServerProcess; - } - async receiveFile(path: string): Promise { const { url } = await this.jsonrpc("receive", { output_file: path }); return url; } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - receiveData(callback: StorageProviderDataCallback): Promise { + receiveData(): Promise { throw new Error("receiveData is not implemented in GftpStorageProvider"); } @@ -102,8 +97,7 @@ export class GftpStorageProvider implements StorageProvider { return url; } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - release(urls: string[]): Promise { + release(): Promise { // NOTE: Due to GFTP's handling of file Ids (hashes), all files with same content will share IDs, so releasing // one might break transfer of another one. Therefore, we release all files on close(). return Promise.resolve(undefined); @@ -119,21 +113,19 @@ export class GftpStorageProvider implements StorageProvider { async close() { await this.releaseAll(); - const stream = this.getGftpServerProcess(); - if (stream) await streamEnd(this.getGftpServerProcess().stdin); + this.readline?.close(); + this.gftpServerProcess?.kill(); } private async jsonrpc(method: string, params: object = {}) { if (!this.isInitiated()) await this.init(); - if (!this.reader) this.reader = this.readStream(this.getGftpServerProcess().stdout); const paramsStr = JSON.stringify(params); const query = `{"jsonrpc": "2.0", "id": "1", "method": "${method}", "params": ${paramsStr}}\n`; let valueStr = ""; - await streamWrite(this.getGftpServerProcess().stdin, query); try { - const { value } = await this.reader.next(); + const value = await this.readline?.question(query); if (!value) throw "Unable to get GFTP command result"; - const { result } = JSON.parse(value as string); + const { result } = JSON.parse(value); valueStr = value; if (result === undefined) throw value; return result; @@ -144,12 +136,6 @@ export class GftpStorageProvider implements StorageProvider { } } - async *readStream(readable) { - for await (const line of chunksToLinesAsync(readable)) { - yield chomp(line); - } - } - private async uploadStream(stream: AsyncGenerator): Promise { // FIXME: temp file is never deleted. const file_name = await this.generateTempFileName();