Skip to content

Commit

Permalink
feat(nbstore): remove async on connection api (#9187)
Browse files Browse the repository at this point in the history
We should not use async on `connect` and `disconnect`, for `WebSocketConnection` will never connect when offline.

We should handle the connection status of each storage in sync, using the `connection.waitForConnect`

This PR also puts the connection reference count on the `connect` and disconnect`
  • Loading branch information
EYHN committed Dec 18, 2024
1 parent 3fddf05 commit 64b017d
Show file tree
Hide file tree
Showing 20 changed files with 157 additions and 223 deletions.
11 changes: 8 additions & 3 deletions packages/common/nbstore/src/__tests__/frontend.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ test('doc', async () => {
type: 'workspace',
});

await docStorage.connect();
docStorage.connect();

await docStorage.waitForConnected();

const frontend1 = new DocFrontend(docStorage, null);
frontend1.start();
Expand Down Expand Up @@ -66,8 +68,11 @@ test('awareness', async () => {
type: 'workspace',
});

await storage1.connect();
await storage2.connect();
storage1.connect();
storage2.connect();

await storage1.waitForConnected();
await storage2.waitForConnected();

// peer a
const docA = new YDoc({ guid: 'test-doc' });
Expand Down
30 changes: 19 additions & 11 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ test('doc', async () => {
const peerB = new SpaceStorage([peerBDoc]);
const peerC = new SpaceStorage([peerCDoc]);

await peerA.connect();
await peerB.connect();
await peerC.connect();
peerA.connect();
peerB.connect();
peerC.connect();

await peerA.waitForConnected();
await peerB.waitForConnected();
await peerC.waitForConnected();

await peerA.get('doc').pushDocUpdate({
docId: 'doc1',
Expand Down Expand Up @@ -121,6 +125,18 @@ test('blob', async () => {
type: 'workspace',
});

const peerA = new SpaceStorage([a]);
const peerB = new SpaceStorage([b]);
const peerC = new SpaceStorage([c]);

peerA.connect();
peerB.connect();
peerC.connect();

await peerA.waitForConnected();
await peerB.waitForConnected();
await peerC.waitForConnected();

await a.set({
key: 'test',
data: new Uint8Array([1, 2, 3, 4]),
Expand All @@ -135,14 +151,6 @@ test('blob', async () => {
createdAt: new Date(100),
});

const peerA = new SpaceStorage([a]);
const peerB = new SpaceStorage([b]);
const peerC = new SpaceStorage([c]);

await peerA.connect();
await peerB.connect();
await peerC.connect();

const sync = new Sync(peerA, [peerB, peerC]);
sync.start();

Expand Down
82 changes: 51 additions & 31 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import EventEmitter2 from 'eventemitter2';
import { throttle } from 'lodash-es';

export type ConnectionStatus =
| 'idle'
Expand All @@ -13,6 +14,8 @@ export abstract class Connection<T = any> {
private _status: ConnectionStatus = 'idle';
protected error?: Error;
private refCount = 0;
private _enableAutoReconnect = false;
private connectingAbort?: AbortController;

constructor() {
this.autoReconnect();
Expand Down Expand Up @@ -45,63 +48,80 @@ export abstract class Connection<T = any> {
}

protected setStatus(status: ConnectionStatus, error?: Error) {
const shouldEmit = status !== this._status && error !== this.error;
const shouldEmit = status !== this._status || error !== this.error;
this._status = status;
this.error = error;
if (shouldEmit) {
this.emitStatusChanged(status, error);
}
}

abstract doConnect(): Promise<T>;
abstract doDisconnect(conn: T): Promise<void>;
protected abstract doConnect(signal?: AbortSignal): Promise<T>;
protected abstract doDisconnect(conn: T): void;

ref() {
this.refCount++;
}

deref() {
this.refCount = Math.max(0, this.refCount - 1);
}

async connect() {
private innerConnect() {
if (this.status === 'idle' || this.status === 'error') {
this._enableAutoReconnect = true;
this.setStatus('connecting');
try {
this._inner = await this.doConnect();
this.setStatus('connected');
} catch (error) {
this.setStatus('error', error as any);
}
this.connectingAbort = new AbortController();
this.doConnect(this.connectingAbort.signal)
.then(value => {
if (!this.connectingAbort?.signal.aborted) {
this.setStatus('connected');
this._inner = value;
} else {
try {
this.doDisconnect(value);
} catch (error) {
console.error('failed to disconnect', error);
}
}
})
.catch(error => {
if (!this.connectingAbort?.signal.aborted) {
this.setStatus('error', error as any);
}
});
}
}

async disconnect() {
this.deref();
if (this.refCount > 0) {
return;
connect() {
this.refCount++;
if (this.refCount === 1) {
this.innerConnect();
}
}

if (this.status === 'connected') {
disconnect() {
this.refCount--;
if (this.refCount === 0) {
this._enableAutoReconnect = false;
this.connectingAbort?.abort();
try {
if (this._inner) {
await this.doDisconnect(this._inner);
this._inner = null;
this.doDisconnect(this._inner);
}
this.setStatus('closed');
} catch (error) {
this.setStatus('error', error as any);
console.error('failed to disconnect', error);
}
this.setStatus('closed');
this._inner = null;
}
}

private autoReconnect() {
// TODO:
// - maximum retry count
// - dynamic sleep time (attempt < 3 ? 1s : 1min)?
this.onStatusChanged(() => {
this.connect().catch(() => {});
});
this.onStatusChanged(
throttle(() => {
() => {
if (this._enableAutoReconnect) {
this.innerConnect();
}
};
}, 1000)
);
}

waitForConnected(signal?: AbortSignal) {
Expand Down Expand Up @@ -146,6 +166,6 @@ export class DummyConnection extends Connection<undefined> {
}

doDisconnect() {
return Promise.resolve(undefined);
return;
}
}
2 changes: 0 additions & 2 deletions packages/common/nbstore/src/connection/shared-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ export function share<T extends Connection<any>>(conn: T): T {
const existing = CONNECTIONS.get(conn.shareId);

if (existing) {
existing.ref();
return existing as T;
}

CONNECTIONS.set(conn.shareId, conn);
conn.ref();

return conn;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class BroadcastChannelConnection extends Connection<BroadcastChannel> {
return new BroadcastChannel(this.channelName);
}

override async doDisconnect() {
override doDisconnect() {
this.close();
}

Expand Down
5 changes: 1 addition & 4 deletions packages/common/nbstore/src/impls/cloud/awareness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorag
return this.connection.inner;
}

override async connect(): Promise<void> {
await super.connect();
}

override async update(record: AwarenessRecord): Promise<void> {
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
Expand All @@ -44,6 +40,7 @@ export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorag
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
): () => void {
// TODO: handle disconnect
// leave awareness
const leave = () => {
this.socket.emit('space:leave-awareness', {
Expand Down
33 changes: 21 additions & 12 deletions packages/common/nbstore/src/impls/cloud/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,38 @@ export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
new SocketConnection(this.peer, this.options.socketOptions)
);

private disposeConnectionStatusListener?: () => void;

private get socket() {
return this.connection.inner;
}

override async connect(): Promise<void> {
await super.connect();
this.connection.onStatusChanged(status => {
if (status === 'connected') {
this.join().catch(err => {
console.error('doc storage join failed', err);
});
this.socket.on('space:broadcast-doc-update', this.onServerUpdate);
}
});
override connect() {
if (!this.disposeConnectionStatusListener) {
this.disposeConnectionStatusListener = this.connection.onStatusChanged(
status => {
if (status === 'connected') {
this.join().catch(err => {
console.error('doc storage join failed', err);
});
this.socket.on('space:broadcast-doc-update', this.onServerUpdate);
}
}
);
}
super.connect();
}

override async disconnect(): Promise<void> {
override disconnect() {
if (this.disposeConnectionStatusListener) {
this.disposeConnectionStatusListener();
}
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
this.socket.off('space:broadcast-doc-update', this.onServerUpdate);
await super.connect();
super.disconnect();
}

async join() {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/impls/cloud/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class SocketConnection extends Connection<Socket> {
return conn;
}

override async doDisconnect(conn: Socket) {
override doDisconnect(conn: Socket) {
conn.close();
}

Expand Down
17 changes: 8 additions & 9 deletions packages/common/nbstore/src/impls/idb/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ export class IDBConnection extends Connection<{
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
this.setStatus(
'closed',
new Error('Blocking a new version. Closing the connection.')
);
},
Expand All @@ -38,13 +39,11 @@ export class IDBConnection extends Connection<{
};
}

override async doDisconnect() {
this.close();
}

private close(error?: Error) {
this.maybeConnection?.channel.close();
this.maybeConnection?.db.close();
this.setStatus('closed', error);
override doDisconnect(db: {
db: IDBPDatabase<DocStorageSchema>;
channel: BroadcastChannel;
}) {
db.channel.close();
db.db.close();
}
}
3 changes: 1 addition & 2 deletions packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { share } from '../../connection';
import {
type DocClock,
type DocClocks,
Expand All @@ -17,7 +16,7 @@ interface ChannelMessage {
}

export class IndexedDBDocStorage extends DocStorage {
readonly connection = share(new IDBConnection(this.options));
readonly connection = new IDBConnection(this.options);

get db() {
return this.connection.inner.db;
Expand Down
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/impls/idb/v1/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class DocIDBConnection extends Connection<IDBPDatabase<DocDBSchema>> {
});
}

override async doDisconnect(conn: IDBPDatabase<DocDBSchema>) {
override doDisconnect(conn: IDBPDatabase<DocDBSchema>) {
conn.close();
}
}
Expand Down Expand Up @@ -57,7 +57,7 @@ export class BlobIDBConnection extends Connection<IDBPDatabase<BlobDBSchema>> {
});
}

override async doDisconnect(conn: IDBPDatabase<BlobDBSchema>) {
override doDisconnect(conn: IDBPDatabase<BlobDBSchema>) {
conn.close();
}
}
Loading

0 comments on commit 64b017d

Please sign in to comment.