diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index c1658ead8db88..1d848de9526a0 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -5,7 +5,8 @@ "private": true, "sideEffects": false, "exports": { - ".": "./src/index.ts" + ".": "./src/index.ts", + "./op": "./src/op/index.ts" }, "dependencies": { "@toeverything/infra": "workspace:*", diff --git a/packages/common/nbstore/src/impls/index.ts b/packages/common/nbstore/src/impls/index.ts new file mode 100644 index 0000000000000..b7acf6cb06510 --- /dev/null +++ b/packages/common/nbstore/src/impls/index.ts @@ -0,0 +1,19 @@ +import type { Storage } from '../storage'; + +type StorageConstructor = new (...args: any[]) => Storage; + +export const storages: StorageConstructor[] = []; + +// in next pr +// eslint-disable-next-line sonarjs/no-empty-collection +const AvailableStorageImplementations = storages.reduce( + (acc, curr) => { + acc[curr.name] = curr; + return acc; + }, + {} as Record +); + +export const getAvailableStorageImplementations = (name: string) => { + return AvailableStorageImplementations[name]; +}; diff --git a/packages/common/nbstore/src/op/consumer.ts b/packages/common/nbstore/src/op/consumer.ts new file mode 100644 index 0000000000000..80764e855c696 --- /dev/null +++ b/packages/common/nbstore/src/op/consumer.ts @@ -0,0 +1,134 @@ +import type { OpConsumer } from '@toeverything/infra/op'; +import { Observable } from 'rxjs'; + +import { getAvailableStorageImplementations } from '../impls'; +import { + BlobStorage, + DocStorage, + HistoricalDocStorage, + SpaceStorage, + type Storage, + type StorageOptions, + SyncStorage, +} from '../storage'; +import type { SpaceStorageOps } from './ops'; + +export class SpaceStorageConsumer extends SpaceStorage { + constructor(private readonly consumer: OpConsumer) { + super([]); + this.registerConnectionHandlers(); + this.listen(); + } + + listen() { + this.consumer.listen(); + } + + add(name: string, options: StorageOptions) { + const Storage = getAvailableStorageImplementations(name); + const storage = new Storage(options); + this.storages.set(storage.storageType, storage); + this.registerStorageHandlers(storage); + } + + override async destroy() { + await super.destroy(); + this.consumer.destroy(); + } + + private registerConnectionHandlers() { + this.consumer.register('addStorage', ({ name, opts }) => { + this.add(name, opts); + }); + this.consumer.register('connect', this.connect.bind(this)); + this.consumer.register('disconnect', this.disconnect.bind(this)); + this.consumer.register('connection', () => { + return new Observable(subscriber => { + subscriber.add( + this.on('connection', payload => { + subscriber.next(payload); + }) + ); + }); + }); + this.consumer.register('destroy', this.destroy.bind(this)); + } + + private registerStorageHandlers(storage: Storage) { + if (storage instanceof DocStorage) { + this.registerDocHandlers(storage); + } else if (storage instanceof BlobStorage) { + this.registerBlobHandlers(storage); + } else if (storage instanceof SyncStorage) { + this.registerSyncHandlers(storage); + } + } + + private registerDocHandlers(storage: DocStorage) { + this.consumer.register('getDoc', storage.getDoc.bind(storage)); + this.consumer.register('getDocDiff', ({ docId, state }) => { + return storage.getDocDiff(docId, state); + }); + this.consumer.register( + 'pushDocUpdate', + storage.pushDocUpdate.bind(storage) + ); + this.consumer.register( + 'getDocTimestamps', + storage.getDocTimestamps.bind(storage) + ); + this.consumer.register('deleteDoc', storage.deleteDoc.bind(storage)); + this.consumer.register('subscribeDocUpdate', () => { + return new Observable(subscriber => { + subscriber.add( + storage.subscribeDocUpdate(update => { + subscriber.next(update); + }) + ); + }); + }); + + if (storage instanceof HistoricalDocStorage) { + this.consumer.register('listHistory', ({ docId, filter }) => { + return storage.listHistories(docId, filter); + }); + this.consumer.register('getHistory', ({ docId, timestamp }) => { + return storage.getHistory(docId, timestamp); + }); + this.consumer.register('deleteHistory', ({ docId, timestamp }) => { + return storage.deleteHistory(docId, timestamp); + }); + this.consumer.register('rollbackDoc', ({ docId, timestamp }) => { + return storage.rollbackDoc(docId, timestamp); + }); + } + } + + private registerBlobHandlers(storage: BlobStorage) { + this.consumer.register('getBlob', storage.get.bind(storage)); + this.consumer.register('setBlob', storage.set.bind(storage)); + this.consumer.register('deleteBlob', ({ key, permanently }) => { + return storage.delete(key, permanently); + }); + this.consumer.register('listBlobs', storage.list.bind(storage)); + this.consumer.register('releaseBlobs', storage.release.bind(storage)); + } + + private registerSyncHandlers(storage: SyncStorage) { + this.consumer.register( + 'getPeerClocks', + storage.getPeerClocks.bind(storage) + ); + this.consumer.register('setPeerClock', ({ peer, ...clock }) => { + return storage.setPeerClock(peer, clock); + }); + this.consumer.register( + 'getPeerPushedClocks', + storage.getPeerPushedClocks.bind(storage) + ); + this.consumer.register('setPeerPushedClock', ({ peer, ...clock }) => { + return storage.setPeerPushedClock(peer, clock); + }); + this.consumer.register('clearClocks', storage.clearClocks.bind(storage)); + } +} diff --git a/packages/common/nbstore/src/op/index.ts b/packages/common/nbstore/src/op/index.ts new file mode 100644 index 0000000000000..bd2cc616b9368 --- /dev/null +++ b/packages/common/nbstore/src/op/index.ts @@ -0,0 +1,51 @@ +import { OpClient } from '@toeverything/infra/op'; + +import type { Storage } from '../storage'; +import type { SpaceStorageOps } from './ops'; + +export class SpaceStorageClient extends OpClient { + /** + * Adding a storage implementation to the backend. + * + * NOTE: + * Because the storage beckend might be put behind a worker, we cant pass the instance but only + * the constructor name and its options to let the backend construct the instance. + */ + async addStorage Storage>( + Impl: T, + ...opts: ConstructorParameters + ) { + await this.call('addStorage', { name: Impl.name, opts: opts[0] }); + } + + async connect() { + await this.call('connect'); + } + + async disconnect() { + await this.call('disconnect'); + } + + override async destroy() { + await this.call('destroy'); + super.destroy(); + } + + connection$() { + return this.ob$('connection'); + } +} + +export class SpaceStorageWorkerClient extends SpaceStorageClient { + private readonly worker: Worker; + constructor() { + const worker = new Worker(new URL('./worker.ts', import.meta.url)); + super(worker); + this.worker = worker; + } + + override async destroy() { + await super.destroy(); + this.worker.terminate(); + } +} diff --git a/packages/common/nbstore/src/op/ops.ts b/packages/common/nbstore/src/op/ops.ts new file mode 100644 index 0000000000000..10ac262774009 --- /dev/null +++ b/packages/common/nbstore/src/op/ops.ts @@ -0,0 +1,58 @@ +import { type OpSchema } from '@toeverything/infra/op'; + +import type { ConnectionStatus } from '../connection'; +import type { + BlobRecord, + DocClock, + DocClocks, + DocDiff, + DocRecord, + DocUpdate, + HistoryFilter, + ListedBlobRecord, + ListedHistory, + StorageOptions, + StorageType, +} from '../storage'; + +export interface SpaceStorageOps extends OpSchema { + // init + addStorage: [{ name: string; opts: StorageOptions }, void]; + + // connection + connect: [void, void]; + disconnect: [void, void]; + connection: [ + void, + { storage: StorageType; status: ConnectionStatus; error?: Error }, + ]; + destroy: [void, void]; + + // doc + getDoc: [string, DocRecord | null]; + getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null]; + pushDocUpdate: [DocUpdate, DocClock]; + getDocTimestamps: [Date, DocClocks]; + deleteDoc: [string, void]; + subscribeDocUpdate: [void, DocRecord]; + + // history + listHistory: [{ docId: string; filter?: HistoryFilter }, ListedHistory[]]; + getHistory: [DocClock, DocRecord | null]; + deleteHistory: [DocClock, void]; + rollbackDoc: [DocClock & { editor?: string }, void]; + + // blob + getBlob: [string, BlobRecord | null]; + setBlob: [BlobRecord, void]; + deleteBlob: [{ key: string; permanently: boolean }, void]; + releaseBlobs: [void, void]; + listBlobs: [void, ListedBlobRecord[]]; + + // sync + getPeerClocks: [string, DocClocks]; + setPeerClock: [{ peer: string } & DocClock, void]; + getPeerPushedClocks: [string, DocClocks]; + setPeerPushedClock: [{ peer: string } & DocClock, void]; + clearClocks: [void, void]; +} diff --git a/packages/common/nbstore/src/op/worker.ts b/packages/common/nbstore/src/op/worker.ts new file mode 100644 index 0000000000000..62b85b2c5b2c2 --- /dev/null +++ b/packages/common/nbstore/src/op/worker.ts @@ -0,0 +1,11 @@ +import { OpConsumer } from '@toeverything/infra/op'; + +import { SpaceStorageConsumer } from './consumer'; +import type { SpaceStorageOps } from './ops'; + +const consumer = new SpaceStorageConsumer( + // @ts-expect-error safe + new OpConsumer(self) +); + +consumer.listen();