diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index 1d848de9526a0..5fc2f1083e002 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -6,7 +6,9 @@ "sideEffects": false, "exports": { ".": "./src/index.ts", - "./op": "./src/op/index.ts" + "./op": "./src/op/index.ts", + "./idb": "./src/impls/idb/index.ts", + "./idb/v1": "./src/impls/idb/v1/index.ts" }, "dependencies": { "@toeverything/infra": "workspace:*", @@ -14,5 +16,11 @@ "lodash-es": "^4.17.21", "rxjs": "^7.8.1", "yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" + }, + "devDependencies": { + "idb": "^8.0.0" + }, + "peerDependencies": { + "idb": "^8.0.0" } } diff --git a/packages/common/nbstore/src/impls/idb/blob.ts b/packages/common/nbstore/src/impls/idb/blob.ts new file mode 100644 index 0000000000000..2cc5a945c0991 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/blob.ts @@ -0,0 +1,89 @@ +import { share } from '../../connection'; +import { + type BlobRecord, + BlobStorage, + type ListedBlobRecord, +} from '../../storage'; +import { IDBConnection } from './db'; + +export class IndexedDBBlobStorage extends BlobStorage { + readonly connection = share(new IDBConnection(this.options)); + + get db() { + return this.connection.inner; + } + + override async get(key: string) { + const trx = this.db.transaction(['blobs', 'blobData'], 'readonly'); + const blob = await trx.objectStore('blobs').get(key); + const data = await trx.objectStore('blobData').get(key); + + if (!blob || blob.deletedAt || !data) { + return null; + } + + return { + ...blob, + data: data.data, + }; + } + + override async set(blob: BlobRecord) { + const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); + await trx.objectStore('blobs').put({ + key: blob.key, + mime: blob.mime, + size: blob.data.byteLength, + createdAt: new Date(), + deletedAt: null, + }); + await trx.objectStore('blobData').put({ + key: blob.key, + data: blob.data, + }); + } + + override async delete(key: string, permanently: boolean) { + if (permanently) { + const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); + await trx.objectStore('blobs').delete(key); + await trx.objectStore('blobData').delete(key); + } else { + const trx = this.db.transaction('blobs', 'readwrite'); + const blob = await trx.store.get(key); + if (blob) { + await trx.store.put({ + ...blob, + deletedAt: new Date(), + }); + } + } + } + + override async release() { + const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); + + const it = trx.objectStore('blobs').iterate(); + + for await (const item of it) { + if (item.value.deletedAt) { + await item.delete(); + await trx.objectStore('blobData').delete(item.value.key); + } + } + } + + override async list() { + const trx = this.db.transaction('blobs', 'readonly'); + const it = trx.store.iterate(); + + const blobs: ListedBlobRecord[] = []; + for await (const item of it) { + if (!item.value.deletedAt) { + blobs.push(item.value); + } + } + + return blobs; + } +} diff --git a/packages/common/nbstore/src/impls/idb/db.ts b/packages/common/nbstore/src/impls/idb/db.ts new file mode 100644 index 0000000000000..65facaa4e9a29 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/db.ts @@ -0,0 +1,43 @@ +import { type IDBPDatabase, openDB } from 'idb'; + +import { Connection } from '../../connection'; +import type { StorageOptions } from '../../storage'; +import { type DocStorageSchema, migrator } from './schema'; + +export class IDBConnection extends Connection> { + private readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`; + + override get shareId() { + return `idb(${migrator.version}):${this.dbName}`; + } + + constructor(private readonly opts: StorageOptions) { + super(); + } + + override async doConnect() { + return openDB(this.dbName, migrator.version, { + upgrade: migrator.migrate, + blocking: () => { + // if, for example, an tab with newer version is opened, this function will be called. + // we should close current connection to allow the new version to upgrade the db. + this.close( + new Error('Blocking a new version. Closing the connection.') + ); + }, + blocked: () => { + // fallback to retry auto retry + this.setStatus('error', new Error('Blocked by other tabs.')); + }, + }); + } + + override async doDisconnect() { + this.close(); + } + + private close(error?: Error) { + this.maybeConnection?.close(); + this.setStatus('closed', error); + } +} diff --git a/packages/common/nbstore/src/impls/idb/doc.ts b/packages/common/nbstore/src/impls/idb/doc.ts new file mode 100644 index 0000000000000..d977b83ec284f --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/doc.ts @@ -0,0 +1,118 @@ +import { share } from '../../connection'; +import { + type DocClocks, + type DocRecord, + DocStorage, + type DocUpdate, +} from '../../storage'; +import { IDBConnection } from './db'; + +export class IndexedDBDocStorage extends DocStorage { + readonly connection = share(new IDBConnection(this.options)); + + get db() { + return this.connection.inner; + } + + override async pushDocUpdate(update: DocUpdate) { + const trx = this.db.transaction(['updates', 'clocks'], 'readwrite'); + const timestamp = new Date(); + await trx.objectStore('updates').add({ + ...update, + createdAt: timestamp, + }); + + await trx.objectStore('clocks').put({ docId: update.docId, timestamp }); + + return { docId: update.docId, timestamp }; + } + + protected override async getDocSnapshot(docId: string) { + const trx = this.db.transaction('snapshots', 'readonly'); + const record = await trx.store.get(docId); + + if (!record) { + return null; + } + + return { + docId, + bin: record.bin, + timestamp: record.updatedAt, + }; + } + + override async deleteDoc(docId: string) { + const trx = this.db.transaction( + ['snapshots', 'updates', 'clocks'], + 'readwrite' + ); + + const idx = trx.objectStore('updates').index('docId'); + const iter = idx.iterate(IDBKeyRange.only(docId)); + + for await (const { value } of iter) { + await trx.objectStore('updates').delete([value.docId, value.createdAt]); + } + + await trx.objectStore('snapshots').delete(docId); + await trx.objectStore('clocks').delete(docId); + } + + override async getDocTimestamps(after: Date = new Date(0)) { + const trx = this.db.transaction('clocks', 'readonly'); + + const clocks = await trx.store.getAll(); + + return clocks.reduce((ret, cur) => { + if (cur.timestamp > after) { + ret[cur.docId] = cur.timestamp; + } + return ret; + }, {} as DocClocks); + } + + protected override async setDocSnapshot( + snapshot: DocRecord + ): Promise { + const trx = this.db.transaction('snapshots', 'readwrite'); + const record = await trx.store.get(snapshot.docId); + + if (!record || record.updatedAt < snapshot.timestamp) { + await trx.store.put({ + docId: snapshot.docId, + bin: snapshot.bin, + createdAt: record?.createdAt ?? snapshot.timestamp, + updatedAt: snapshot.timestamp, + }); + } + + trx.commit(); + return true; + } + + protected override async getDocUpdates(docId: string): Promise { + const trx = this.db.transaction('updates', 'readonly'); + const updates = await trx.store.index('docId').getAll(docId); + + return updates.map(update => ({ + docId, + bin: update.bin, + timestamp: update.createdAt, + })); + } + + protected override async markUpdatesMerged( + docId: string, + updates: DocRecord[] + ): Promise { + const trx = this.db.transaction('updates', 'readwrite'); + + await Promise.all( + updates.map(update => trx.store.delete([docId, update.timestamp])) + ); + + trx.commit(); + return updates.length; + } +} diff --git a/packages/common/nbstore/src/impls/idb/index.ts b/packages/common/nbstore/src/impls/idb/index.ts new file mode 100644 index 0000000000000..debe733b43801 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/index.ts @@ -0,0 +1,3 @@ +export * from './blob'; +export * from './doc'; +export * from './sync'; diff --git a/packages/common/nbstore/src/impls/idb/schema.ts b/packages/common/nbstore/src/impls/idb/schema.ts new file mode 100644 index 0000000000000..953e9f395dff8 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/schema.ts @@ -0,0 +1,174 @@ +import { type DBSchema, type OpenDBCallbacks } from 'idb'; + +/** +IndexedDB + > DB(workspace:${workspaceId}) + > Table(Snapshots) + > Table(Updates) + > Table(...) + +Table(Snapshots) +| docId | blob | createdAt | updatedAt | +|-------|------|-----------|-----------| +| str | bin | Date | Date | + +Table(Updates) +| id | docId | blob | createdAt | +|----|-------|------|-----------| +|auto| str | bin | Date | + +Table(Clocks) +| docId | clock | +|-------|-----------| +| str | Date | + +Table(Blobs) +| key | mime | size | createdAt | deletedAt | +|-----|------|------|-----------|-----------| +| str | str | num | Date | Date | + +Table(BlobData) +| key | data | +|-----|------| +| str | bin | + +Table(PeerClocks) +| peer | docId | clock | pushed | +|------|-------|-----------|-----------| +| str | str | Date | Date | + */ +export interface DocStorageSchema extends DBSchema { + snapshots: { + key: string; + value: { + docId: string; + bin: Uint8Array; + createdAt: Date; + updatedAt: Date; + }; + indexes: { + updatedAt: Date; + }; + }; + updates: { + key: [string, Date]; + value: { + docId: string; + bin: Uint8Array; + createdAt: Date; + }; + indexes: { + docId: string; + }; + }; + clocks: { + key: string; + value: { + docId: string; + timestamp: Date; + }; + indexes: { + timestamp: Date; + }; + }; + blobs: { + key: string; + value: { + key: string; + mime: string; + size: number; + createdAt: Date; + deletedAt: Date | null; + }; + }; + blobData: { + key: string; + value: { + key: string; + data: Uint8Array; + }; + }; + peerClocks: { + key: [string, string]; + value: { + peer: string; + docId: string; + clock: Date; + pushedClock: Date; + }; + indexes: { + peer: string; + }; + }; +} + +const migrate: OpenDBCallbacks['upgrade'] = ( + db, + oldVersion, + _newVersion, + trx +) => { + if (!oldVersion) { + oldVersion = 0; + } + + for (let i = oldVersion; i < migrations.length; i++) { + migrations[i](db, trx); + } +}; + +type MigrateParameters = Parameters< + NonNullable['upgrade']> +>; +type Migrate = (db: MigrateParameters[0], trx: MigrateParameters[3]) => void; + +// START REGION: migrations +const init: Migrate = db => { + const snapshots = db.createObjectStore('snapshots', { + keyPath: 'docId', + autoIncrement: false, + }); + + snapshots.createIndex('updatedAt', 'updatedAt', { unique: false }); + + const updates = db.createObjectStore('updates', { + keyPath: ['docId', 'createdAt'], + autoIncrement: false, + }); + + updates.createIndex('docId', 'docId', { unique: false }); + + const clocks = db.createObjectStore('clocks', { + keyPath: 'docId', + autoIncrement: false, + }); + + clocks.createIndex('timestamp', 'timestamp', { unique: false }); + + const peerClocks = db.createObjectStore('peerClocks', { + keyPath: ['peer', 'docId'], + autoIncrement: false, + }); + + peerClocks.createIndex('peer', 'peer', { unique: false }); + + db.createObjectStore('blobs', { + keyPath: 'key', + autoIncrement: false, + }); + + db.createObjectStore('blobData', { + keyPath: 'key', + autoIncrement: false, + }); +}; +// END REGION + +// 1. all schema changed should be put in migrations +// 2. order matters +const migrations: Migrate[] = [init]; + +export const migrator = { + version: migrations.length, + migrate, +}; diff --git a/packages/common/nbstore/src/impls/idb/sync.ts b/packages/common/nbstore/src/impls/idb/sync.ts new file mode 100644 index 0000000000000..9eea7febac76a --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/sync.ts @@ -0,0 +1,66 @@ +import { share } from '../../connection'; +import { type DocClock, type DocClocks, SyncStorage } from '../../storage'; +import { IDBConnection } from './db'; +export class IndexedDBSyncStorage extends SyncStorage { + readonly connection = share(new IDBConnection(this.options)); + + get db() { + return this.connection.inner; + } + + override async getPeerClocks(peer: string) { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const records = await trx.store.index('peer').getAll(peer); + + return records.reduce((clocks, { docId, clock }) => { + clocks[docId] = clock; + return clocks; + }, {} as DocClocks); + } + + override async setPeerClock(peer: string, clock: DocClock) { + const trx = this.db.transaction('peerClocks', 'readwrite'); + const record = await trx.store.get([peer, clock.docId]); + + if (!record || record.clock < clock.timestamp) { + await trx.store.put({ + peer, + docId: clock.docId, + clock: clock.timestamp, + pushedClock: record?.pushedClock ?? new Date(0), + }); + } + } + + override async getPeerPushedClocks(peer: string) { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const records = await trx.store.index('peer').getAll(peer); + + return records.reduce((clocks, { docId, pushedClock }) => { + clocks[docId] = pushedClock; + return clocks; + }, {} as DocClocks); + } + + override async setPeerPushedClock(peer: string, clock: DocClock) { + const trx = this.db.transaction('peerClocks', 'readwrite'); + const record = await trx.store.get([peer, clock.docId]); + + if (!record || record.pushedClock < clock.timestamp) { + await trx.store.put({ + peer, + docId: clock.docId, + clock: record?.clock ?? new Date(0), + pushedClock: clock.timestamp, + }); + } + } + + override async clearClocks() { + const trx = this.db.transaction('peerClocks', 'readwrite'); + + await trx.store.clear(); + } +} diff --git a/packages/common/nbstore/src/impls/idb/v1/blob.ts b/packages/common/nbstore/src/impls/idb/v1/blob.ts new file mode 100644 index 0000000000000..fcd370f62b0ea --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/v1/blob.ts @@ -0,0 +1,62 @@ +import { share } from '../../../connection'; +import { BlobStorage, type ListedBlobRecord } from '../../../storage'; +import { BlobIDBConnection } from './db'; + +/** + * @deprecated readonly + */ +export class IndexedDBV1BlobStorage extends BlobStorage { + readonly connection = share(new BlobIDBConnection(this.spaceId)); + + get db() { + return this.connection.inner; + } + + override async get(key: string) { + const trx = this.db.transaction('blob', 'readonly'); + const blob = await trx.store.get(key); + if (!blob) { + return null; + } + + return { + key, + mime: '', + createdAt: new Date(), + data: new Uint8Array(blob), + }; + } + + override async delete(key: string, permanently: boolean) { + if (permanently) { + const trx = this.db.transaction('blob', 'readwrite'); + await trx.store.delete(key); + } + } + + override async list() { + const trx = this.db.transaction('blob', 'readonly'); + const it = trx.store.iterate(); + + const records: ListedBlobRecord[] = []; + + for await (const { key, value } of it) { + records.push({ + key, + mime: '', + size: value.byteLength, + createdAt: new Date(), + }); + } + + return records; + } + + override async set() { + // no more writes + } + + override async release() { + // no more writes + } +} diff --git a/packages/common/nbstore/src/impls/idb/v1/db.ts b/packages/common/nbstore/src/impls/idb/v1/db.ts new file mode 100644 index 0000000000000..6497e1b6992e2 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/v1/db.ts @@ -0,0 +1,63 @@ +import { type DBSchema, type IDBPDatabase, openDB } from 'idb'; + +import { Connection } from '../../../connection'; + +export interface DocDBSchema extends DBSchema { + workspace: { + key: string; + value: { + id: string; + updates: { + timestamp: number; + update: Uint8Array; + }[]; + }; + }; +} + +export class DocIDBConnection extends Connection> { + override get shareId() { + return 'idb(old):affine-local'; + } + + override async doConnect() { + return openDB('affine-local', 1, { + upgrade: db => { + db.createObjectStore('workspace', { keyPath: 'id' }); + }, + }); + } + + override async doDisconnect(conn: IDBPDatabase) { + conn.close(); + } +} + +export interface BlobDBSchema extends DBSchema { + blob: { + key: string; + value: ArrayBuffer; + }; +} + +export class BlobIDBConnection extends Connection> { + constructor(private readonly workspaceId: string) { + super(); + } + + override get shareId() { + return `idb(old-blob):${this.workspaceId}`; + } + + override async doConnect() { + return openDB(`${this.workspaceId}_blob`, 1, { + upgrade: db => { + db.createObjectStore('blob'); + }, + }); + } + + override async doDisconnect(conn: IDBPDatabase) { + conn.close(); + } +} diff --git a/packages/common/nbstore/src/impls/idb/v1/doc.ts b/packages/common/nbstore/src/impls/idb/v1/doc.ts new file mode 100644 index 0000000000000..2660c2d608df1 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/v1/doc.ts @@ -0,0 +1,71 @@ +import { share } from '../../../connection'; +import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage'; +import { DocIDBConnection } from './db'; + +/** + * @deprecated readonly + */ +export class IndexedDBV1DocStorage extends DocStorage { + readonly connection = share(new DocIDBConnection()); + + get db() { + return this.connection.inner; + } + + get name() { + return 'idb(old)'; + } + + override async getDoc(docId: string) { + const trx = this.db.transaction('workspace', 'readonly'); + const record = await trx.store.get(docId); + + if (!record?.updates.length) { + return null; + } + + if (record.updates.length === 1) { + return { + docId, + bin: record.updates[0].update, + timestamp: new Date(record.updates[0].timestamp), + }; + } + + return { + docId, + bin: await this.mergeUpdates(record.updates.map(update => update.update)), + timestamp: new Date(record.updates.at(-1)?.timestamp ?? Date.now()), + }; + } + + protected override async getDocSnapshot() { + return null; + } + + override async pushDocUpdate(update: DocUpdate) { + // no more writes to old db + return { docId: update.docId, timestamp: new Date() }; + } + + override async deleteDoc(docId: string) { + const trx = this.db.transaction('workspace', 'readwrite'); + await trx.store.delete(docId); + } + + override async getDocTimestamps() { + return {}; + } + + protected override async setDocSnapshot(): Promise { + return false; + } + + protected override async getDocUpdates(): Promise { + return []; + } + + protected override async markUpdatesMerged(): Promise { + return 0; + } +} diff --git a/packages/common/nbstore/src/impls/idb/v1/index.ts b/packages/common/nbstore/src/impls/idb/v1/index.ts new file mode 100644 index 0000000000000..d476ae6eb9b92 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/v1/index.ts @@ -0,0 +1,2 @@ +export * from './blob'; +export * from './doc'; diff --git a/packages/common/nbstore/src/impls/index.ts b/packages/common/nbstore/src/impls/index.ts index b7acf6cb06510..8bce204798b51 100644 --- a/packages/common/nbstore/src/impls/index.ts +++ b/packages/common/nbstore/src/impls/index.ts @@ -1,11 +1,26 @@ import type { Storage } from '../storage'; +import { + IndexedDBBlobStorage, + IndexedDBDocStorage, + IndexedDBSyncStorage, +} from './idb'; +import { IndexedDBV1BlobStorage, IndexedDBV1DocStorage } from './idb/v1'; type StorageConstructor = new (...args: any[]) => Storage; -export const storages: StorageConstructor[] = []; +const idb: StorageConstructor[] = [ + IndexedDBDocStorage, + IndexedDBBlobStorage, + IndexedDBSyncStorage, +]; + +const idbv1: StorageConstructor[] = [ + IndexedDBV1DocStorage, + IndexedDBV1BlobStorage, +]; + +export const storages: StorageConstructor[] = [...idbv1, ...idb]; -// in next pr -// eslint-disable-next-line sonarjs/no-empty-collection const AvailableStorageImplementations = storages.reduce( (acc, curr) => { acc[curr.name] = curr; diff --git a/yarn.lock b/yarn.lock index d4be8065aea03..f8fe4106d112b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -729,9 +729,12 @@ __metadata: dependencies: "@toeverything/infra": "workspace:*" eventemitter2: "npm:^6.4.9" + idb: "npm:^8.0.0" lodash-es: "npm:^4.17.21" rxjs: "npm:^7.8.1" yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" + peerDependencies: + idb: ^8.0.0 languageName: unknown linkType: soft