Skip to content

Commit

Permalink
chore: add 'ws' import in node
Browse files Browse the repository at this point in the history
  • Loading branch information
SewerynKras committed Oct 9, 2024
1 parent b069f54 commit 74b3702
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions src/shared/storage/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { encode, toObject } from "flatbuffers/js/flexbuffers.js";
import * as jsSha3 from "js-sha3";
import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils";
import { GolemInternalError } from "../error/golem-error";
import fsPromises from "fs/promises";
import fsPromises, { FileHandle } from "fs/promises";
import WebSocket from "ws";

export interface WebSocketStorageProviderOptions {
logger?: Logger;
Expand Down Expand Up @@ -61,6 +62,7 @@ export class WebSocketStorageProvider implements StorageProvider {
private services = new Map<string, string>();
private logger: Logger;
private ready = false;
private openHandles = new Set<FileHandle>();

constructor(
private readonly yagnaApi: YagnaApi,
Expand All @@ -69,8 +71,9 @@ export class WebSocketStorageProvider implements StorageProvider {
this.logger = options?.logger || defaultLogger("storage");
}

close(): Promise<void> {
async close(): Promise<void> {
this.ready = false;
await Promise.allSettled(Array.from(this.openHandles).map((handle) => handle.close()));
return this.release(Array.from(this.services.keys()));
}

Expand All @@ -84,6 +87,10 @@ export class WebSocketStorageProvider implements StorageProvider {

const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]);
ws.addEventListener("message", (event) => {
if (!(event.data instanceof ArrayBuffer)) {
this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data });
return;
}
const req = toObject(event.data) as GsbRequestPublishUnion;
if (req.component === "GetMetadata") {
this.respond(ws, req.id, { fileSize: data.byteLength });
Expand Down Expand Up @@ -112,9 +119,15 @@ export class WebSocketStorageProvider implements StorageProvider {
const fileStats = await fsPromises.stat(src);
const fileSize = fileStats.size;

const fd = await fsPromises.open(src, "r");
const fileHandle = await fsPromises.open(src, "r");
this.openHandles.add(fileHandle);

ws.addEventListener("message", async (event) => {
if (!(event.data instanceof ArrayBuffer)) {
this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data });
return;
}

const req = toObject(event.data) as GsbRequestPublishUnion;

if (req.component === "GetMetadata") {
Expand All @@ -126,15 +139,16 @@ export class WebSocketStorageProvider implements StorageProvider {
const chunk = Buffer.alloc(chunkSize);

try {
await fd.read(chunk, 0, chunkSize, offset);
await fileHandle.read(chunk, 0, chunkSize, offset);
this.respond(ws, req.id, {
content: chunk,
offset,
});
} finally {
// After the last chunk, close the file descriptor
if (offset + chunkSize >= fileSize) {
await fd.close();
await fileHandle.close();
this.openHandles.delete(fileHandle);
}
}
} else {
Expand All @@ -151,6 +165,10 @@ export class WebSocketStorageProvider implements StorageProvider {

const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]);
ws.addEventListener("message", (event) => {
if (!(event.data instanceof ArrayBuffer)) {
this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data });
return;
}
const req = toObject(event.data) as GsbRequestReceiveUnion;
if (req.component === "UploadChunk") {
data.push(req.payload.chunk);
Expand All @@ -176,17 +194,22 @@ export class WebSocketStorageProvider implements StorageProvider {

const fileInfo = await this.createFileInfo();
const fileHandle = await fsPromises.open(path, "w");
this.openHandles.add(fileHandle);
const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]);

ws.addEventListener("message", (event) => {
ws.addEventListener("message", async (event) => {
if (!(event.data instanceof ArrayBuffer)) {
this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data });
return;
}
const req = toObject(event.data) as GsbRequestReceiveUnion;
if (req.component === "UploadChunk") {
fileHandle.write(req.payload.chunk.content);
await fileHandle.write(req.payload.chunk.content);
this.respond(ws, req.id, null);
} else if (req.component === "UploadFinished") {
this.respond(ws, req.id, null);

fileHandle.close();
await fileHandle.close();
this.openHandles.delete(fileHandle);
} else {
this.logger.error(`Unsupported message in receiveFile(): ${(req as GsbRequest<void>).component}`);
}
Expand Down

0 comments on commit 74b3702

Please sign in to comment.