Skip to content

Commit

Permalink
chore(doc-storage): organize code
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 21, 2024
1 parent 16b9666 commit 7395f58
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 264 deletions.
21 changes: 1 addition & 20 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ interface UpdateAwarenessMessage {
docId: string;
awarenessUpdate: string;
}

@WebSocketGateway()
export class SpaceSyncGateway
implements OnGatewayConnection, OnGatewayDisconnect
Expand Down Expand Up @@ -181,26 +182,6 @@ export class SpaceSyncGateway
}
}

async joinWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.join(room);
}

async leaveWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.leave(room);
}

assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
if (!client.rooms.has(room)) {
throw new NotInSpace({ spaceId: room.split(':')[0] });
}
}

// v3
@SubscribeMessage('space:join')
async onJoinSpace(
Expand Down
86 changes: 23 additions & 63 deletions packages/common/doc-storage/src/impls/cloud/blob.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
import {
type Blob,
BlobStorage,
type DocStorageOptions,
type BlobStorageOptions,
type ListedBlob,
} from '../../storage';
import type { Socket } from './socket';

interface CloudBlobStorageOptions extends DocStorageOptions {
import {
base64ToUint8Array,
type ServerEventsMap,
type Socket,
SocketProtocol,
uint8ArrayToBase64,
} from './socket';

interface CloudBlobStorageOptions extends BlobStorageOptions {
socket: Socket;
}

export class CloudBlobStorage extends BlobStorage<CloudBlobStorageOptions> {
get socket() {
private get socket() {
return this.options.socket;
}

override async connect(): Promise<void> {
// the event will be polled, there is no need to wait for socket to be connected
await this.clientHandShake();
// this.socket.on('space:broadcast-blob-update', this.onServerUpdates);
}

private async clientHandShake() {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});

if ('error' in res) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(res.error.message);
}
await SocketProtocol.joinBlob(this.socket, this.spaceType, this.spaceId);
this.socket.on('space:broadcast-blob-update', this.onServerUpdate);
}

override async disconnect(): Promise<void> {
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
// this.socket.off('space:broadcast-doc-updates', this.onServerUpdate);
SocketProtocol.leaveBlob(this.socket, this.spaceType, this.spaceId);
this.socket.off('space:broadcast-blob-update', this.onServerUpdate);
}

// onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => {
// if (
// this.spaceType === message.spaceType &&
// this.spaceId === message.spaceId
// ) {
// // how do we deal with the data?
// }
// };
onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => {
if (
this.spaceType === message.spaceType &&
this.spaceId === message.spaceId
) {
// how do we deal with the data?
}
};

override async getBlob(key: string): Promise<Blob | null> {
const res = await this.socket.emitWithAck('space:get-blob', {
Expand Down Expand Up @@ -109,32 +98,3 @@ export class CloudBlobStorage extends BlobStorage<CloudBlobStorageOptions> {
return res.data;
}
}

export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);

const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};

reader.readAsDataURL(blob);
});
}

export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
72 changes: 16 additions & 56 deletions packages/common/doc-storage/src/impls/cloud/doc.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,34 @@
import { DocStorage, type DocStorageOptions } from '../../storage';
import type { ServerEventsMap, Socket } from './socket';
import {
base64ToUint8Array,
type ServerEventsMap,
type Socket,
SocketProtocol,
uint8ArrayToBase64,
} from './socket';

interface CloudDocStorageOptions extends DocStorageOptions {
endpoint: string;
socket: Socket;
}

export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
get name() {
// @ts-expect-error we need it
return this.options.socket.io.uri;
}

get socket() {
private get socket() {
return this.options.socket;
}

override async connect(): Promise<void> {
// the event will be polled, there is no need to wait for socket to be connected
await this.clientHandShake();
this.socket.on('space:broadcast-doc-updates', this.onServerUpdates);
get name() {
return this.options.endpoint;
}

private async clientHandShake() {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});

if ('error' in res) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(res.error.message);
}
override async connect(): Promise<void> {
await SocketProtocol.join(this.socket, this.spaceType, this.spaceId);
this.socket?.on('space:broadcast-doc-updates', this.onServerUpdates);
}

override async disconnect(): Promise<void> {
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
this.socket.off('space:broadcast-doc-updates', this.onServerUpdates);
SocketProtocol.leave(this.socket, this.spaceType, this.spaceId);
this.socket?.off('space:broadcast-doc-updates', this.onServerUpdates);
}

onServerUpdates: ServerEventsMap['space:broadcast-doc-updates'] = message => {
Expand Down Expand Up @@ -152,32 +141,3 @@ export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
return false;
}
}

export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);

const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};

reader.readAsDataURL(blob);
});
}

export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
1 change: 1 addition & 0 deletions packages/common/doc-storage/src/impls/cloud/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './blob';
export * from './doc';
Loading

0 comments on commit 7395f58

Please sign in to comment.