diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index b922e477bb6a3..b6e8809287a12 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -113,6 +113,7 @@ interface UpdateAwarenessMessage { docId: string; awarenessUpdate: string; } + @WebSocketGateway() export class SpaceSyncGateway implements OnGatewayConnection, OnGatewayDisconnect @@ -181,26 +182,6 @@ export class SpaceSyncGateway } } - async joinWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.join(room); - } - - async leaveWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.leave(room); - } - - assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) { - if (!client.rooms.has(room)) { - throw new NotInSpace({ spaceId: room.split(':')[0] }); - } - } - // v3 @SubscribeMessage('space:join') async onJoinSpace( diff --git a/packages/common/doc-storage/src/impls/cloud/blob.ts b/packages/common/doc-storage/src/impls/cloud/blob.ts index 0abe7594b70c3..c1326fec51ef5 100644 --- a/packages/common/doc-storage/src/impls/cloud/blob.ts +++ b/packages/common/doc-storage/src/impls/cloud/blob.ts @@ -1,55 +1,44 @@ import { type Blob, BlobStorage, - type DocStorageOptions, + type BlobStorageOptions, type ListedBlob, } from '../../storage'; -import type { Socket } from './socket'; - -interface CloudBlobStorageOptions extends DocStorageOptions { +import { + base64ToUint8Array, + type ServerEventsMap, + type Socket, + SocketProtocol, + uint8ArrayToBase64, +} from './socket'; + +interface CloudBlobStorageOptions extends BlobStorageOptions { socket: Socket; } export class CloudBlobStorage extends BlobStorage { - get socket() { + private get socket() { return this.options.socket; } override async connect(): Promise { - // the event will be polled, there is no need to wait for socket to be connected - await this.clientHandShake(); - // this.socket.on('space:broadcast-blob-update', this.onServerUpdates); - } - - private async clientHandShake() { - const res = await this.socket.emitWithAck('space:join', { - spaceType: this.spaceType, - spaceId: this.spaceId, - clientVersion: BUILD_CONFIG.appVersion, - }); - - if ('error' in res) { - // TODO(@forehalo): use [UserFriendlyError] - throw new Error(res.error.message); - } + await SocketProtocol.joinBlob(this.socket, this.spaceType, this.spaceId); + this.socket.on('space:broadcast-blob-update', this.onServerUpdate); } override async disconnect(): Promise { - this.socket.emit('space:leave', { - spaceType: this.spaceType, - spaceId: this.spaceId, - }); - // this.socket.off('space:broadcast-doc-updates', this.onServerUpdate); + SocketProtocol.leaveBlob(this.socket, this.spaceType, this.spaceId); + this.socket.off('space:broadcast-blob-update', this.onServerUpdate); } - // onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => { - // if ( - // this.spaceType === message.spaceType && - // this.spaceId === message.spaceId - // ) { - // // how do we deal with the data? - // } - // }; + onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => { + if ( + this.spaceType === message.spaceType && + this.spaceId === message.spaceId + ) { + // how do we deal with the data? + } + }; override async getBlob(key: string): Promise { const res = await this.socket.emitWithAck('space:get-blob', { @@ -109,32 +98,3 @@ export class CloudBlobStorage extends BlobStorage { return res.data; } } - -export function uint8ArrayToBase64(array: Uint8Array): Promise { - return new Promise(resolve => { - // Create a blob from the Uint8Array - const blob = new Blob([array]); - - const reader = new FileReader(); - reader.onload = function () { - const dataUrl = reader.result as string | null; - if (!dataUrl) { - resolve(''); - return; - } - // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data - const base64 = dataUrl.split(',')[1]; - resolve(base64); - }; - - reader.readAsDataURL(blob); - }); -} - -export function base64ToUint8Array(base64: string) { - const binaryString = atob(base64); - const binaryArray = binaryString.split('').map(function (char) { - return char.charCodeAt(0); - }); - return new Uint8Array(binaryArray); -} diff --git a/packages/common/doc-storage/src/impls/cloud/doc.ts b/packages/common/doc-storage/src/impls/cloud/doc.ts index 6708090a5f2e3..183835c701653 100644 --- a/packages/common/doc-storage/src/impls/cloud/doc.ts +++ b/packages/common/doc-storage/src/impls/cloud/doc.ts @@ -1,45 +1,34 @@ import { DocStorage, type DocStorageOptions } from '../../storage'; -import type { ServerEventsMap, Socket } from './socket'; +import { + base64ToUint8Array, + type ServerEventsMap, + type Socket, + SocketProtocol, + uint8ArrayToBase64, +} from './socket'; interface CloudDocStorageOptions extends DocStorageOptions { + endpoint: string; socket: Socket; } export class CloudDocStorage extends DocStorage { - get name() { - // @ts-expect-error we need it - return this.options.socket.io.uri; - } - - get socket() { + private get socket() { return this.options.socket; } - override async connect(): Promise { - // the event will be polled, there is no need to wait for socket to be connected - await this.clientHandShake(); - this.socket.on('space:broadcast-doc-updates', this.onServerUpdates); + get name() { + return this.options.endpoint; } - private async clientHandShake() { - const res = await this.socket.emitWithAck('space:join', { - spaceType: this.spaceType, - spaceId: this.spaceId, - clientVersion: BUILD_CONFIG.appVersion, - }); - - if ('error' in res) { - // TODO(@forehalo): use [UserFriendlyError] - throw new Error(res.error.message); - } + override async connect(): Promise { + await SocketProtocol.join(this.socket, this.spaceType, this.spaceId); + this.socket?.on('space:broadcast-doc-updates', this.onServerUpdates); } override async disconnect(): Promise { - this.socket.emit('space:leave', { - spaceType: this.spaceType, - spaceId: this.spaceId, - }); - this.socket.off('space:broadcast-doc-updates', this.onServerUpdates); + SocketProtocol.leave(this.socket, this.spaceType, this.spaceId); + this.socket?.off('space:broadcast-doc-updates', this.onServerUpdates); } onServerUpdates: ServerEventsMap['space:broadcast-doc-updates'] = message => { @@ -152,32 +141,3 @@ export class CloudDocStorage extends DocStorage { return false; } } - -export function uint8ArrayToBase64(array: Uint8Array): Promise { - return new Promise(resolve => { - // Create a blob from the Uint8Array - const blob = new Blob([array]); - - const reader = new FileReader(); - reader.onload = function () { - const dataUrl = reader.result as string | null; - if (!dataUrl) { - resolve(''); - return; - } - // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data - const base64 = dataUrl.split(',')[1]; - resolve(base64); - }; - - reader.readAsDataURL(blob); - }); -} - -export function base64ToUint8Array(base64: string) { - const binaryString = atob(base64); - const binaryArray = binaryString.split('').map(function (char) { - return char.charCodeAt(0); - }); - return new Uint8Array(binaryArray); -} diff --git a/packages/common/doc-storage/src/impls/cloud/index.ts b/packages/common/doc-storage/src/impls/cloud/index.ts index f42f6dd754a91..d476ae6eb9b92 100644 --- a/packages/common/doc-storage/src/impls/cloud/index.ts +++ b/packages/common/doc-storage/src/impls/cloud/index.ts @@ -1 +1,2 @@ +export * from './blob'; export * from './doc'; diff --git a/packages/common/doc-storage/src/impls/cloud/socket.ts b/packages/common/doc-storage/src/impls/cloud/socket.ts index 7169728610317..a92909dc6549b 100644 --- a/packages/common/doc-storage/src/impls/cloud/socket.ts +++ b/packages/common/doc-storage/src/impls/cloud/socket.ts @@ -1,4 +1,4 @@ -import type { Socket as IO } from 'socket.io-client'; +import type { Socket as SocketIO } from 'socket.io-client'; // TODO(@forehalo): use [UserFriendlyError] interface EventError { @@ -37,6 +37,21 @@ interface ClientEvents { { clientId: string }, ]; 'space:leave': { spaceType: string; spaceId: string }; + 'space:join-awareness': [ + { + spaceType: string; + spaceId: string; + docId: string; + clientVersion: string; + }, + { clientId: string }, + ]; + 'space:leave-awareness': { + spaceType: string; + spaceId: string; + docId: string; + }; + 'space:push-doc-updates': [ { spaceType: string; spaceId: string; docId: string; updates: string[] }, { timestamp: number }, @@ -64,6 +79,11 @@ interface ClientEvents { ]; // blobs + 'space:join-blob': [ + { spaceType: string; spaceId: string }, + { clientId: string }, + ]; + 'space:leave-blob': { spaceType: string; spaceId: string }; 'space:get-blob': [ { spaceType: string; @@ -100,6 +120,13 @@ interface ClientEvents { }, { key: string; size: number }[], ]; + 'space:broadcast-blob-update': { + spaceType: string; + spaceId: string; + key: string; + data: string; + mime: string; + }; } export type ServerEventsMap = { @@ -115,4 +142,100 @@ export type ClientEventsMap = { : (data: ClientEvents[Key]) => void; }; -export type Socket = IO; +export type Socket = SocketIO; + +export function uint8ArrayToBase64(array: Uint8Array): Promise { + return new Promise(resolve => { + // Create a blob from the Uint8Array + const blob = new Blob([array]); + + const reader = new FileReader(); + reader.onload = function () { + const dataUrl = reader.result as string | null; + if (!dataUrl) { + resolve(''); + return; + } + // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data + const base64 = dataUrl.split(',')[1]; + resolve(base64); + }; + + reader.readAsDataURL(blob); + }); +} + +export function base64ToUint8Array(base64: string) { + const binaryString = atob(base64); + const binaryArray = binaryString.split('').map(function (char) { + return char.charCodeAt(0); + }); + return new Uint8Array(binaryArray); +} + +export const SocketProtocol = { + async join(socket: Socket, spaceType: string, spaceId: string) { + const res = await socket.emitWithAck('space:join', { + spaceType, + spaceId, + clientVersion: BUILD_CONFIG.appVersion, + }); + + if ('error' in res) { + // TODO(@forehalo): use [UserFriendlyError] + throw new Error(res.error.message); + } + }, + leave(socket: Socket, spaceType: string, spaceId: string) { + socket.emit('space:leave', { + spaceType, + spaceId, + }); + }, + + async joinAwareness( + socket: Socket, + spaceType: string, + spaceId: string, + docId: string + ) { + const res = await socket.emitWithAck('space:join-awareness', { + spaceType, + spaceId, + docId, + clientVersion: BUILD_CONFIG.appVersion, + }); + + if ('error' in res) { + throw new Error(res.error.message); + } + }, + leaveAwareness( + socket: Socket, + spaceType: string, + spaceId: string, + docId: string + ) { + socket.emit('space:leave-awareness', { + spaceType, + spaceId, + docId, + }); + }, + async joinBlob(socket: Socket, spaceType: string, spaceId: string) { + const res = await socket.emitWithAck('space:join-blob', { + spaceType, + spaceId, + }); + + if ('error' in res) { + throw new Error(res.error.message); + } + }, + leaveBlob(socket: Socket, spaceType: string, spaceId: string) { + socket.emit('space:leave-blob', { + spaceType, + spaceId, + }); + }, +}; diff --git a/packages/common/doc-storage/src/impls/idb/blob.ts b/packages/common/doc-storage/src/impls/idb/blob.ts index 4d03dd78e82d7..e94e0453cf0dc 100644 --- a/packages/common/doc-storage/src/impls/idb/blob.ts +++ b/packages/common/doc-storage/src/impls/idb/blob.ts @@ -1,13 +1,18 @@ -import { type Blob, BlobStorage, type ListedBlob } from '../../storage'; -import { type SpaceIDB, SpaceIndexedDbManager } from './db'; +import { + type Blob, + BlobStorage, + type BlobStorageOptions, + type ListedBlob, +} from '../../storage'; +import { type SpaceIDB } from './db'; -export class IndexedDBBlobStorage extends BlobStorage { - private db!: SpaceIDB; +export interface IndexedDBBlobStorageOptions extends BlobStorageOptions { + db: SpaceIDB; +} - override async connect(): Promise { - this.db = await SpaceIndexedDbManager.open( - `${this.spaceType}:${this.spaceId}` - ); +export class IndexedDBBlobStorage extends BlobStorage { + get db() { + return this.options.db; } override async getBlob(key: string): Promise { diff --git a/packages/common/doc-storage/src/impls/idb/db.ts b/packages/common/doc-storage/src/impls/idb/db.ts index 7790ab320056b..d1361cfa1a2a2 100644 --- a/packages/common/doc-storage/src/impls/idb/db.ts +++ b/packages/common/doc-storage/src/impls/idb/db.ts @@ -4,29 +4,24 @@ import { type DocStorageSchema, latestVersion, migrate } from './schema'; export type SpaceIDB = IDBPDatabase; -export class SpaceIndexedDbManager { - private static db: SpaceIDB | null = null; - - static async open(name: string) { - if (this.db) { - return this.db; - } - +export const IDBProtocol = { + async open(name: string) { + let db: SpaceIDB | null = null; const blocking = () => { // notify user this connection is blocking other tabs to upgrade db - this.db?.close(); + db?.close(); }; const blocked = () => { // notify user there is tab opened with old version, close it first }; - this.db = await openDB(name, latestVersion, { + db = await openDB(name, latestVersion, { upgrade: migrate, blocking, blocked, }); - return this.db; - } -} + return db; + }, +}; diff --git a/packages/common/doc-storage/src/impls/idb/doc.ts b/packages/common/doc-storage/src/impls/idb/doc.ts index caeeac8093cf2..3d221cf6a0b64 100644 --- a/packages/common/doc-storage/src/impls/idb/doc.ts +++ b/packages/common/doc-storage/src/impls/idb/doc.ts @@ -1,21 +1,22 @@ -import { type DocRecord, DocStorage, type DocUpdate } from '../../storage'; -import { type SpaceIDB, SpaceIndexedDbManager } from './db'; - -export class IndexedDBDocStorage extends DocStorage { - private db!: SpaceIDB; +import { + type DocRecord, + DocStorage, + type DocStorageOptions, + type DocUpdate, +} from '../../storage'; +import { type SpaceIDB } from './db'; + +export interface IndexedDBDocStorageOptions extends DocStorageOptions { + db: SpaceIDB; +} +export class IndexedDBDocStorage extends DocStorage { get name() { return 'idb'; } - override async connect(): Promise { - this.db = await SpaceIndexedDbManager.open( - `${this.spaceType}:${this.spaceId}` - ); - } - - override async disconnect(): Promise { - this.db.close(); + get db() { + return this.options.db; } override async pushDocUpdates( diff --git a/packages/common/doc-storage/src/impls/idb/index.ts b/packages/common/doc-storage/src/impls/idb/index.ts index f42f6dd754a91..6cc1814ca653a 100644 --- a/packages/common/doc-storage/src/impls/idb/index.ts +++ b/packages/common/doc-storage/src/impls/idb/index.ts @@ -1 +1,3 @@ +export * from './blob'; +export * from './db'; export * from './doc'; diff --git a/packages/common/doc-storage/src/impls/sqlite/db.ts b/packages/common/doc-storage/src/impls/sqlite/db.ts new file mode 100644 index 0000000000000..4e91a5892cad8 --- /dev/null +++ b/packages/common/doc-storage/src/impls/sqlite/db.ts @@ -0,0 +1,9 @@ +import { DocStorage as NativeDocStorage } from '@affine/native'; + +export const SqliteProtocol = { + async init(path: string) { + const db = new NativeDocStorage(path); + await db.connect(); + return db; + }, +}; diff --git a/packages/common/doc-storage/src/impls/sqlite/doc.ts b/packages/common/doc-storage/src/impls/sqlite/doc.ts index 570f137325318..07c553704e873 100644 --- a/packages/common/doc-storage/src/impls/sqlite/doc.ts +++ b/packages/common/doc-storage/src/impls/sqlite/doc.ts @@ -20,10 +20,6 @@ export class SqliteDocStorage extends DocStorage { return this.options.db; } - constructor(options: SqliteDocStorageOptions) { - super(options); - } - override pushDocUpdates( docId: string, updates: Uint8Array[] diff --git a/packages/common/doc-storage/src/impls/sqlite/index.ts b/packages/common/doc-storage/src/impls/sqlite/index.ts index f42f6dd754a91..6cc1814ca653a 100644 --- a/packages/common/doc-storage/src/impls/sqlite/index.ts +++ b/packages/common/doc-storage/src/impls/sqlite/index.ts @@ -1 +1,3 @@ +export * from './blob'; +export * from './db'; export * from './doc'; diff --git a/packages/common/doc-storage/src/storage/blob.ts b/packages/common/doc-storage/src/storage/blob.ts index a5d6559b4b859..aafdb7d9e42cf 100644 --- a/packages/common/doc-storage/src/storage/blob.ts +++ b/packages/common/doc-storage/src/storage/blob.ts @@ -1,9 +1,6 @@ -import { Connection } from './connection'; +import { BaseStorage, type StorageOptions } from './storage'; -export interface BlobStorageOptions { - spaceType: string; - spaceId: string; -} +export interface BlobStorageOptions extends StorageOptions {} export interface Blob { key: string; @@ -18,22 +15,7 @@ export interface ListedBlob { export abstract class BlobStorage< Options extends BlobStorageOptions = BlobStorageOptions, -> extends Connection { - public readonly options: Options; - - constructor(opts: Options) { - super(); - this.options = opts; - } - - get spaceType() { - return this.options.spaceType; - } - - get spaceId() { - return this.options.spaceId; - } - +> extends BaseStorage { abstract getBlob(key: string): Promise; abstract setBlob(blob: Blob): Promise; abstract deleteBlob(key: string, permanently: boolean): Promise; diff --git a/packages/common/doc-storage/src/storage/connection.ts b/packages/common/doc-storage/src/storage/connection.ts index f82a72fbd3931..039b1087824a7 100644 --- a/packages/common/doc-storage/src/storage/connection.ts +++ b/packages/common/doc-storage/src/storage/connection.ts @@ -1,11 +1,11 @@ export class Connection { - protected connected: boolean = false; - connect(): Promise { + connected = false; + + async connect() { this.connected = true; - return Promise.resolve(); } - disconnect(): Promise { + + async disconnect() { this.connected = false; - return Promise.resolve(); } } diff --git a/packages/common/doc-storage/src/storage/doc/doc.ts b/packages/common/doc-storage/src/storage/doc.ts similarity index 92% rename from packages/common/doc-storage/src/storage/doc/doc.ts rename to packages/common/doc-storage/src/storage/doc.ts index b6e22a92b7287..8b45d97815f38 100644 --- a/packages/common/doc-storage/src/storage/doc/doc.ts +++ b/packages/common/doc-storage/src/storage/doc.ts @@ -9,47 +9,53 @@ import { UndoManager, } from 'yjs'; -import { Connection } from '../connection'; -import { SingletonLocker } from '../lock'; -import type { - DocDiff, - DocRecord, - DocUpdate, - Editor, - HistoryFilter, -} from './types'; - -export type SpaceType = 'workspace' | 'userspace'; -export interface DocStorageOptions { - spaceType: string; +import { SingletonLocker } from './lock'; +import { BaseStorage, type StorageOptions } from './storage'; + +export interface DocRecord { spaceId: string; + docId: string; + bin: Uint8Array; + timestamp: number; + editor?: string; +} + +export interface DocDiff { + missing: Uint8Array; + state: Uint8Array; + timestamp: number; +} + +export interface DocUpdate { + bin: Uint8Array; + timestamp: number; + editor?: string; +} + +export interface HistoryFilter { + before?: number; + limit?: number; +} + +export interface Editor { + name: string; + avatarUrl: string | null; +} + +export interface DocStorageOptions extends StorageOptions { mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; } export abstract class DocStorage< Opts extends DocStorageOptions = DocStorageOptions, -> extends Connection { +> extends BaseStorage { abstract get name(): string; - public readonly options: Opts; private readonly locker = new SingletonLocker(); protected readonly updatesListeners = new Set< (docId: string, updates: Uint8Array[], timestamp: number) => void >(); - get spaceType() { - return this.options.spaceType; - } - - get spaceId() { - return this.options.spaceId; - } - - constructor(options: Opts) { - super(); - this.options = options; - } - // REGION: open apis /** * Tell a binary is empty yjs binary or not. diff --git a/packages/common/doc-storage/src/storage/doc/index.ts b/packages/common/doc-storage/src/storage/doc/index.ts deleted file mode 100644 index 1fa447519627c..0000000000000 --- a/packages/common/doc-storage/src/storage/doc/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './doc'; -export * from './types'; diff --git a/packages/common/doc-storage/src/storage/doc/types.ts b/packages/common/doc-storage/src/storage/doc/types.ts deleted file mode 100644 index 366eb280e4a25..0000000000000 --- a/packages/common/doc-storage/src/storage/doc/types.ts +++ /dev/null @@ -1,29 +0,0 @@ -export interface DocRecord { - spaceId: string; - docId: string; - bin: Uint8Array; - timestamp: number; - editor?: string; -} - -export interface DocDiff { - missing: Uint8Array; - state: Uint8Array; - timestamp: number; -} - -export interface DocUpdate { - bin: Uint8Array; - timestamp: number; - editor?: string; -} - -export interface HistoryFilter { - before?: number; - limit?: number; -} - -export interface Editor { - name: string; - avatarUrl: string | null; -} diff --git a/packages/common/doc-storage/src/storage/index.ts b/packages/common/doc-storage/src/storage/index.ts index d476ae6eb9b92..03832aae02de0 100644 --- a/packages/common/doc-storage/src/storage/index.ts +++ b/packages/common/doc-storage/src/storage/index.ts @@ -1,2 +1,30 @@ +import type { BlobStorage } from './blob'; +import { Connection } from './connection'; +import type { DocStorage } from './doc'; +import type { StorageOptions } from './storage'; + export * from './blob'; export * from './doc'; + +export abstract class Storage extends Connection { + abstract readonly doc: DocStorage; + abstract readonly blob: BlobStorage; + + constructor(public readonly options: Opts) { + super(); + } + + override async connect() { + if (!this.connected) { + this.connected = true; + await this.doc.connect(); + await this.blob.connect(); + } + } + + override async disconnect() { + await this.doc.disconnect(); + await this.blob.disconnect(); + this.connected = false; + } +} diff --git a/packages/common/doc-storage/src/storage/storage.ts b/packages/common/doc-storage/src/storage/storage.ts new file mode 100644 index 0000000000000..274911bc2d640 --- /dev/null +++ b/packages/common/doc-storage/src/storage/storage.ts @@ -0,0 +1,24 @@ +import { Connection } from './connection'; + +export type SpaceType = 'workspace' | 'userspace'; + +export interface StorageOptions { + type: SpaceType; + id: string; +} + +export abstract class BaseStorage< + Opts extends StorageOptions, +> extends Connection { + get spaceType() { + return this.options.type; + } + + get spaceId() { + return this.options.id; + } + + constructor(public readonly options: Opts) { + super(); + } +}