From 74b37022974fd0bbd603179b7d72310b806f973f Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 9 Oct 2024 17:26:12 +0200 Subject: [PATCH] chore: add 'ws' import in node --- src/shared/storage/ws.ts | 41 +++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/src/shared/storage/ws.ts b/src/shared/storage/ws.ts index c86ec5449..23ec807a4 100644 --- a/src/shared/storage/ws.ts +++ b/src/shared/storage/ws.ts @@ -5,7 +5,8 @@ import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils"; import { GolemInternalError } from "../error/golem-error"; -import fsPromises from "fs/promises"; +import fsPromises, { FileHandle } from "fs/promises"; +import WebSocket from "ws"; export interface WebSocketStorageProviderOptions { logger?: Logger; @@ -61,6 +62,7 @@ export class WebSocketStorageProvider implements StorageProvider { private services = new Map(); private logger: Logger; private ready = false; + private openHandles = new Set(); constructor( private readonly yagnaApi: YagnaApi, @@ -69,8 +71,9 @@ export class WebSocketStorageProvider implements StorageProvider { this.logger = options?.logger || defaultLogger("storage"); } - close(): Promise { + async close(): Promise { this.ready = false; + await Promise.allSettled(Array.from(this.openHandles).map((handle) => handle.close())); return this.release(Array.from(this.services.keys())); } @@ -84,6 +87,10 @@ export class WebSocketStorageProvider implements StorageProvider { const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]); ws.addEventListener("message", (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestPublishUnion; if (req.component === "GetMetadata") { this.respond(ws, req.id, { fileSize: data.byteLength }); @@ -112,9 +119,15 @@ export class WebSocketStorageProvider implements StorageProvider { const fileStats = await fsPromises.stat(src); const fileSize = fileStats.size; - const fd = await fsPromises.open(src, "r"); + const fileHandle = await fsPromises.open(src, "r"); + this.openHandles.add(fileHandle); ws.addEventListener("message", async (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } + const req = toObject(event.data) as GsbRequestPublishUnion; if (req.component === "GetMetadata") { @@ -126,7 +139,7 @@ export class WebSocketStorageProvider implements StorageProvider { const chunk = Buffer.alloc(chunkSize); try { - await fd.read(chunk, 0, chunkSize, offset); + await fileHandle.read(chunk, 0, chunkSize, offset); this.respond(ws, req.id, { content: chunk, offset, @@ -134,7 +147,8 @@ export class WebSocketStorageProvider implements StorageProvider { } finally { // After the last chunk, close the file descriptor if (offset + chunkSize >= fileSize) { - await fd.close(); + await fileHandle.close(); + this.openHandles.delete(fileHandle); } } } else { @@ -151,6 +165,10 @@ export class WebSocketStorageProvider implements StorageProvider { const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); ws.addEventListener("message", (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestReceiveUnion; if (req.component === "UploadChunk") { data.push(req.payload.chunk); @@ -176,17 +194,22 @@ export class WebSocketStorageProvider implements StorageProvider { const fileInfo = await this.createFileInfo(); const fileHandle = await fsPromises.open(path, "w"); + this.openHandles.add(fileHandle); const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); - ws.addEventListener("message", (event) => { + ws.addEventListener("message", async (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestReceiveUnion; if (req.component === "UploadChunk") { - fileHandle.write(req.payload.chunk.content); + await fileHandle.write(req.payload.chunk.content); this.respond(ws, req.id, null); } else if (req.component === "UploadFinished") { this.respond(ws, req.id, null); - - fileHandle.close(); + await fileHandle.close(); + this.openHandles.delete(fileHandle); } else { this.logger.error(`Unsupported message in receiveFile(): ${(req as GsbRequest).component}`); }