Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nbstore): add blob sync #8996

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import 'fake-indexeddb/auto';
import { expect, test } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';

import { IndexedDBDocStorage, IndexedDBSyncStorage } from '../impls/idb';
import {
IndexedDBBlobStorage,
IndexedDBDocStorage,
IndexedDBSyncStorage,
} from '../impls/idb';
import { SpaceStorage } from '../storage';
import { SyncEngine } from '../sync';

test('sync', async () => {
test('doc', async () => {
const doc = new YDoc();
doc.getMap('test').set('hello', 'world');
const update = encodeStateAsUpdate(doc);
Expand Down Expand Up @@ -83,3 +87,69 @@ test('sync', async () => {
expect(c?.bin).toEqual(update2);
}
});

test('blob', async () => {
const a = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const b = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
});

const c = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'c',
type: 'workspace',
});

await a.set({
key: 'test',
data: new Uint8Array([1, 2, 3, 4]),
mime: 'text/plain',
createdAt: new Date(100),
});

await c.set({
key: 'test2',
data: new Uint8Array([4, 3, 2, 1]),
mime: 'text/plain',
createdAt: new Date(100),
});

const peerA = new SpaceStorage([a]);
const peerB = new SpaceStorage([b]);
const peerC = new SpaceStorage([c]);

await peerA.connect();
await peerB.connect();
await peerC.connect();

const sync = new SyncEngine(peerA, [peerB, peerC]);
const abort = new AbortController();
sync.run(abort.signal);

await new Promise(resolve => setTimeout(resolve, 1000));

{
const a = await peerA.get('blob').get('test');
expect(a).not.toBeNull();
expect(a?.data).toEqual(new Uint8Array([1, 2, 3, 4]));
}

{
const b = await peerB.get('blob').get('test');
expect(b).not.toBeNull();
expect(b?.data).toEqual(new Uint8Array([1, 2, 3, 4]));
}

{
const c = await peerC.get('blob').get('test2');
expect(c).not.toBeNull();
expect(c?.data).toEqual(new Uint8Array([4, 3, 2, 1]));
}
});
8 changes: 6 additions & 2 deletions packages/common/nbstore/src/op/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ export class SpaceStorageConsumer extends SpaceStorage {
}

private registerBlobHandlers(storage: BlobStorage) {
this.consumer.register('getBlob', storage.get.bind(storage));
this.consumer.register('setBlob', storage.set.bind(storage));
this.consumer.register('getBlob', key => {
return storage.get(key);
});
this.consumer.register('setBlob', blob => {
return storage.set(blob);
});
this.consumer.register('deleteBlob', ({ key, permanently }) => {
return storage.delete(key, permanently);
});
Expand Down
14 changes: 9 additions & 5 deletions packages/common/nbstore/src/storage/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ export abstract class BlobStorage<
> extends Storage<Options> {
override readonly storageType = 'blob';

abstract get(key: string): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord): Promise<void>;
abstract delete(key: string, permanently: boolean): Promise<void>;
abstract release(): Promise<void>;
abstract list(): Promise<ListedBlobRecord[]>;
abstract get(key: string, signal?: AbortSignal): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
abstract delete(
key: string,
permanently: boolean,
signal?: AbortSignal
): Promise<void>;
abstract release(signal?: AbortSignal): Promise<void>;
abstract list(signal?: AbortSignal): Promise<ListedBlobRecord[]>;
}
89 changes: 89 additions & 0 deletions packages/common/nbstore/src/sync/blob/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { difference } from 'lodash-es';

import type { BlobStorage } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';

export class BlobSyncEngine {
constructor(
readonly local: BlobStorage,
readonly remotes: BlobStorage[]
) {}

private async sync(signal?: AbortSignal) {
throwIfAborted(signal);

for (const remote of this.remotes) {
let localList: string[] = [];
let remoteList: string[] = [];

try {
localList = (await this.local.list(signal)).map(b => b.key);
throwIfAborted(signal);
remoteList = (await remote.list(signal)).map(b => b.key);
throwIfAborted(signal);
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(`error when sync`, err);
continue;
}

const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.local.get(key, signal);
throwIfAborted(signal);
if (data) {
await remote.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${this.local.peer}] to [${remote.peer}]`,
err
);
}
}

const needDownload = difference(remoteList, localList);

for (const key of needDownload) {
try {
const data = await remote.get(key, signal);
throwIfAborted(signal);
if (data) {
await this.local.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${remote.peer}] to [${this.local.peer}]`,
err
);
}
}
}
}

async run(signal?: AbortSignal) {
if (signal?.aborted) {
return;
}

try {
await this.sync(signal);
} catch (error) {
if (error === MANUALLY_STOP) {
return;
}
console.error('sync blob error', error);
}
}
}
14 changes: 13 additions & 1 deletion packages/common/nbstore/src/sync/doc/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,19 @@ export class DocSyncPeer {

setPriority(docId: string, priority: number) {
this.prioritySettings.set(docId, priority);
this.status.jobDocQueue.updatePriority(docId, priority);
return this.status.jobDocQueue.setPriority(docId, priority);
}

addPriority(id: string, priority: number) {
const oldPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, priority);
this.status.jobDocQueue.setPriority(id, oldPriority + priority);

return () => {
const currentPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, currentPriority - priority);
this.status.jobDocQueue.setPriority(id, currentPriority - priority);
};
}

protected mergeUpdates(updates: Uint8Array[]) {
Expand Down
32 changes: 24 additions & 8 deletions packages/common/nbstore/src/sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DocStorage, SpaceStorage } from '../storage';
import type { BlobStorage, DocStorage, SpaceStorage } from '../storage';
import { BlobSyncEngine } from './blob';
import { DocSyncEngine } from './doc';

export class SyncEngine {
Expand All @@ -9,15 +10,30 @@ export class SyncEngine {

async run(signal?: AbortSignal) {
const doc = this.local.tryGet('doc');
const blob = this.local.tryGet('blob');
const sync = this.local.tryGet('sync');

if (doc && sync) {
const peerDocs = this.peers
.map(peer => peer.tryGet('doc'))
.filter((v): v is DocStorage => !!v);
await Promise.allSettled([
(async () => {
if (doc && sync) {
const peerDocs = this.peers
.map(peer => peer.tryGet('doc'))
.filter((v): v is DocStorage => !!v);

const engine = new DocSyncEngine(doc, sync, peerDocs);
await engine.run(signal);
}
const engine = new DocSyncEngine(doc, sync, peerDocs);
await engine.run(signal);
}
})(),
(async () => {
if (blob) {
const peerBlobs = this.peers
.map(peer => peer.tryGet('blob'))
.filter((v): v is BlobStorage => !!v);

const engine = new BlobSyncEngine(blob, peerBlobs);
await engine.run(signal);
}
})(),
]);
}
}
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/utils/priority-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class PriorityQueue {
this.priorityMap.clear();
}

updatePriority(id: string, priority: number) {
setPriority(id: string, priority: number) {
if (this.remove(id)) {
this.push(id, priority);
}
Expand Down
Loading