Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JST-429: Gftp bugfix and refactor #612

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
name: Build and unit-test on supported platforms and NodeJS versions
strategy:
matrix:
node-version: [16.x, 18.x, 20.x]
node-version: [18.x, 20.x]
os: [ubuntu-latest, windows-latest, macos-latest]

runs-on: ${{ matrix.os }}
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@
"author": "GolemFactory <[email protected]>",
"license": "LGPL-3.0",
"engines": {
"node": ">=16.0.0"
"node": ">=18.0.0"
},
"dependencies": {
"@rauschma/stringio": "^1.4.0",
"axios": "^1.1.3",
"bottleneck": "^2.19.5",
"collect.js": "^4.34.3",
Expand Down
14 changes: 8 additions & 6 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,15 @@ export class TaskExecutor {

this.logger?.debug("Initializing task executor services...");
const allocations = await this.paymentService.createAllocation();
this.marketService.run(taskPackage, allocations).catch((e) => this.handleCriticalError(e));
this.agreementPoolService.run().catch((e) => this.handleCriticalError(e));
this.paymentService.run().catch((e) => this.handleCriticalError(e));
await Promise.all([
this.marketService.run(taskPackage, allocations),
this.agreementPoolService.run(),
this.paymentService.run(),
this.networkService?.run(),
this.statsService.run(),
this.storageProvider?.init(),
]).catch((e) => this.handleCriticalError(e));
this.taskService.run().catch((e) => this.handleCriticalError(e));
this.networkService?.run().catch((e) => this.handleCriticalError(e));
this.statsService.run().catch((e) => this.handleCriticalError(e));
this.storageProvider?.init().catch((e) => this.handleCriticalError(e));
if (runtimeContextChecker.isNode) this.handleCancelEvent();
this.options.eventTarget.dispatchEvent(new Events.ComputationStarted());
this.logger?.info(
Expand Down
40 changes: 13 additions & 27 deletions src/storage/gftp.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { StorageProvider, StorageProviderDataCallback } from "./provider";
import { StorageProvider } from "./provider";
import { Logger, runtimeContextChecker } from "../utils";
import path from "path";
import fs from "fs";
import { chomp, chunksToLinesAsync, streamEnd, streamWrite } from "@rauschma/stringio";
import cp from "child_process";
import readline from "node:readline/promises";

export class GftpStorageProvider implements StorageProvider {
private gftpServerProcess;
private reader;
private gftpServerProcess?: cp.ChildProcess;
private readline?: readline.Interface;

/**
* All published URLs to be release on close().
Expand Down Expand Up @@ -52,9 +52,9 @@ export class GftpStorageProvider implements StorageProvider {

this.gftpServerProcess?.stdout?.setEncoding("utf-8");
this.gftpServerProcess?.stderr?.setEncoding("utf-8");

this.gftpServerProcess.stdout.on("data", (data) => this.logger?.debug(`GFTP server stdout: ${data}`));
this.gftpServerProcess.stderr.on("data", (data) => this.logger?.error(`GFTP server stderr: ${data}`));
if (this.gftpServerProcess?.stdout && this.gftpServerProcess?.stdin) {
this.readline = readline.createInterface(this.gftpServerProcess.stdout, this.gftpServerProcess.stdin);
}
});
}

Expand All @@ -70,17 +70,12 @@ export class GftpStorageProvider implements StorageProvider {
return file_name;
}

private getGftpServerProcess() {
return this.gftpServerProcess;
}

async receiveFile(path: string): Promise<string> {
const { url } = await this.jsonrpc("receive", { output_file: path });
return url;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
receiveData(callback: StorageProviderDataCallback): Promise<string> {
receiveData(): Promise<string> {
throw new Error("receiveData is not implemented in GftpStorageProvider");
}

Expand All @@ -102,8 +97,7 @@ export class GftpStorageProvider implements StorageProvider {
return url;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
release(urls: string[]): Promise<void> {
release(): Promise<void> {
// NOTE: Due to GFTP's handling of file Ids (hashes), all files with same content will share IDs, so releasing
// one might break transfer of another one. Therefore, we release all files on close().
return Promise.resolve(undefined);
Expand All @@ -119,21 +113,19 @@ export class GftpStorageProvider implements StorageProvider {

async close() {
await this.releaseAll();
const stream = this.getGftpServerProcess();
if (stream) await streamEnd(this.getGftpServerProcess().stdin);
this.readline?.close();
this.gftpServerProcess?.kill();
}

private async jsonrpc(method: string, params: object = {}) {
if (!this.isInitiated()) await this.init();
if (!this.reader) this.reader = this.readStream(this.getGftpServerProcess().stdout);
const paramsStr = JSON.stringify(params);
const query = `{"jsonrpc": "2.0", "id": "1", "method": "${method}", "params": ${paramsStr}}\n`;
let valueStr = "";
await streamWrite(this.getGftpServerProcess().stdin, query);
try {
const { value } = await this.reader.next();
const value = await this.readline?.question(query);
if (!value) throw "Unable to get GFTP command result";
const { result } = JSON.parse(value as string);
const { result } = JSON.parse(value);
valueStr = value;
if (result === undefined) throw value;
return result;
Expand All @@ -144,12 +136,6 @@ export class GftpStorageProvider implements StorageProvider {
}
}

async *readStream(readable) {
for await (const line of chunksToLinesAsync(readable)) {
yield chomp(line);
}
}

private async uploadStream(stream: AsyncGenerator<Buffer>): Promise<string> {
// FIXME: temp file is never deleted.
const file_name = await this.generateTempFileName();
Expand Down
Loading