From 64b017dc1b47d7bc805595b04b74ad4d8512df8d Mon Sep 17 00:00:00 2001 From: EYHN Date: Wed, 18 Dec 2024 03:59:49 +0000 Subject: [PATCH] feat(nbstore): remove async on connection api (#9187) We should not use async on `connect` and `disconnect`, for `WebSocketConnection` will never connect when offline. We should handle the connection status of each storage in sync, using the `connection.waitForConnect` This PR also puts the connection reference count on the `connect` and disconnect` --- .../nbstore/src/__tests__/frontend.spec.ts | 11 ++- .../common/nbstore/src/__tests__/sync.spec.ts | 30 ++++--- .../nbstore/src/connection/connection.ts | 82 ++++++++++++------- .../src/connection/shared-connection.ts | 2 - .../src/impls/broadcast-channel/channel.ts | 2 +- .../nbstore/src/impls/cloud/awareness.ts | 5 +- .../common/nbstore/src/impls/cloud/doc.ts | 33 +++++--- .../common/nbstore/src/impls/cloud/socket.ts | 2 +- packages/common/nbstore/src/impls/idb/db.ts | 17 ++-- packages/common/nbstore/src/impls/idb/doc.ts | 3 +- .../common/nbstore/src/impls/idb/v1/db.ts | 4 +- .../common/nbstore/src/impls/sqlite/db.ts | 25 ++---- packages/common/nbstore/src/op/consumer.ts | 9 -- packages/common/nbstore/src/storage/index.ts | 61 ++++---------- .../common/nbstore/src/storage/storage.ts | 12 ++- .../apps/electron/src/helper/exposed.ts | 8 +- .../apps/electron/src/helper/nbstore/db.ts | 12 ++- .../electron/src/helper/nbstore/handlers.ts | 19 +---- .../apps/electron/src/helper/nbstore/index.ts | 2 +- .../electron/src/helper/nbstore/storage.ts | 41 +--------- 20 files changed, 157 insertions(+), 223 deletions(-) diff --git a/packages/common/nbstore/src/__tests__/frontend.spec.ts b/packages/common/nbstore/src/__tests__/frontend.spec.ts index 4a00fbae581f4..cedd7ad903150 100644 --- a/packages/common/nbstore/src/__tests__/frontend.spec.ts +++ b/packages/common/nbstore/src/__tests__/frontend.spec.ts @@ -23,7 +23,9 @@ test('doc', async () => { type: 'workspace', }); - await docStorage.connect(); + docStorage.connect(); + + await docStorage.waitForConnected(); const frontend1 = new DocFrontend(docStorage, null); frontend1.start(); @@ -66,8 +68,11 @@ test('awareness', async () => { type: 'workspace', }); - await storage1.connect(); - await storage2.connect(); + storage1.connect(); + storage2.connect(); + + await storage1.waitForConnected(); + await storage2.waitForConnected(); // peer a const docA = new YDoc({ guid: 'test-doc' }); diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index 95ae54032d367..48aacff5cffb0 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -44,9 +44,13 @@ test('doc', async () => { const peerB = new SpaceStorage([peerBDoc]); const peerC = new SpaceStorage([peerCDoc]); - await peerA.connect(); - await peerB.connect(); - await peerC.connect(); + peerA.connect(); + peerB.connect(); + peerC.connect(); + + await peerA.waitForConnected(); + await peerB.waitForConnected(); + await peerC.waitForConnected(); await peerA.get('doc').pushDocUpdate({ docId: 'doc1', @@ -121,6 +125,18 @@ test('blob', async () => { type: 'workspace', }); + const peerA = new SpaceStorage([a]); + const peerB = new SpaceStorage([b]); + const peerC = new SpaceStorage([c]); + + peerA.connect(); + peerB.connect(); + peerC.connect(); + + await peerA.waitForConnected(); + await peerB.waitForConnected(); + await peerC.waitForConnected(); + await a.set({ key: 'test', data: new Uint8Array([1, 2, 3, 4]), @@ -135,14 +151,6 @@ test('blob', async () => { createdAt: new Date(100), }); - const peerA = new SpaceStorage([a]); - const peerB = new SpaceStorage([b]); - const peerC = new SpaceStorage([c]); - - await peerA.connect(); - await peerB.connect(); - await peerC.connect(); - const sync = new Sync(peerA, [peerB, peerC]); sync.start(); diff --git a/packages/common/nbstore/src/connection/connection.ts b/packages/common/nbstore/src/connection/connection.ts index e5f808214b7d2..0df9900521433 100644 --- a/packages/common/nbstore/src/connection/connection.ts +++ b/packages/common/nbstore/src/connection/connection.ts @@ -1,4 +1,5 @@ import EventEmitter2 from 'eventemitter2'; +import { throttle } from 'lodash-es'; export type ConnectionStatus = | 'idle' @@ -13,6 +14,8 @@ export abstract class Connection { private _status: ConnectionStatus = 'idle'; protected error?: Error; private refCount = 0; + private _enableAutoReconnect = false; + private connectingAbort?: AbortController; constructor() { this.autoReconnect(); @@ -45,7 +48,7 @@ export abstract class Connection { } protected setStatus(status: ConnectionStatus, error?: Error) { - const shouldEmit = status !== this._status && error !== this.error; + const shouldEmit = status !== this._status || error !== this.error; this._status = status; this.error = error; if (shouldEmit) { @@ -53,45 +56,56 @@ export abstract class Connection { } } - abstract doConnect(): Promise; - abstract doDisconnect(conn: T): Promise; + protected abstract doConnect(signal?: AbortSignal): Promise; + protected abstract doDisconnect(conn: T): void; - ref() { - this.refCount++; - } - - deref() { - this.refCount = Math.max(0, this.refCount - 1); - } - - async connect() { + private innerConnect() { if (this.status === 'idle' || this.status === 'error') { + this._enableAutoReconnect = true; this.setStatus('connecting'); - try { - this._inner = await this.doConnect(); - this.setStatus('connected'); - } catch (error) { - this.setStatus('error', error as any); - } + this.connectingAbort = new AbortController(); + this.doConnect(this.connectingAbort.signal) + .then(value => { + if (!this.connectingAbort?.signal.aborted) { + this.setStatus('connected'); + this._inner = value; + } else { + try { + this.doDisconnect(value); + } catch (error) { + console.error('failed to disconnect', error); + } + } + }) + .catch(error => { + if (!this.connectingAbort?.signal.aborted) { + this.setStatus('error', error as any); + } + }); } } - async disconnect() { - this.deref(); - if (this.refCount > 0) { - return; + connect() { + this.refCount++; + if (this.refCount === 1) { + this.innerConnect(); } + } - if (this.status === 'connected') { + disconnect() { + this.refCount--; + if (this.refCount === 0) { + this._enableAutoReconnect = false; + this.connectingAbort?.abort(); try { if (this._inner) { - await this.doDisconnect(this._inner); - this._inner = null; + this.doDisconnect(this._inner); } - this.setStatus('closed'); } catch (error) { - this.setStatus('error', error as any); + console.error('failed to disconnect', error); } + this.setStatus('closed'); + this._inner = null; } } @@ -99,9 +113,15 @@ export abstract class Connection { // TODO: // - maximum retry count // - dynamic sleep time (attempt < 3 ? 1s : 1min)? - this.onStatusChanged(() => { - this.connect().catch(() => {}); - }); + this.onStatusChanged( + throttle(() => { + () => { + if (this._enableAutoReconnect) { + this.innerConnect(); + } + }; + }, 1000) + ); } waitForConnected(signal?: AbortSignal) { @@ -146,6 +166,6 @@ export class DummyConnection extends Connection { } doDisconnect() { - return Promise.resolve(undefined); + return; } } diff --git a/packages/common/nbstore/src/connection/shared-connection.ts b/packages/common/nbstore/src/connection/shared-connection.ts index 3ef9e1b165e95..da69e849050e1 100644 --- a/packages/common/nbstore/src/connection/shared-connection.ts +++ b/packages/common/nbstore/src/connection/shared-connection.ts @@ -11,12 +11,10 @@ export function share>(conn: T): T { const existing = CONNECTIONS.get(conn.shareId); if (existing) { - existing.ref(); return existing as T; } CONNECTIONS.set(conn.shareId, conn); - conn.ref(); return conn; } diff --git a/packages/common/nbstore/src/impls/broadcast-channel/channel.ts b/packages/common/nbstore/src/impls/broadcast-channel/channel.ts index e3d7b1327bef2..cd40bd7f21e34 100644 --- a/packages/common/nbstore/src/impls/broadcast-channel/channel.ts +++ b/packages/common/nbstore/src/impls/broadcast-channel/channel.ts @@ -12,7 +12,7 @@ export class BroadcastChannelConnection extends Connection { return new BroadcastChannel(this.channelName); } - override async doDisconnect() { + override doDisconnect() { this.close(); } diff --git a/packages/common/nbstore/src/impls/cloud/awareness.ts b/packages/common/nbstore/src/impls/cloud/awareness.ts index f15b8916293b9..be9a56ef45720 100644 --- a/packages/common/nbstore/src/impls/cloud/awareness.ts +++ b/packages/common/nbstore/src/impls/cloud/awareness.ts @@ -25,10 +25,6 @@ export class CloudAwarenessStorage extends AwarenessStorage { - await super.connect(); - } - override async update(record: AwarenessRecord): Promise { const encodedUpdate = await uint8ArrayToBase64(record.bin); this.socket.emit('space:update-awareness', { @@ -44,6 +40,7 @@ export class CloudAwarenessStorage extends AwarenessStorage void, onCollect: () => AwarenessRecord ): () => void { + // TODO: handle disconnect // leave awareness const leave = () => { this.socket.emit('space:leave-awareness', { diff --git a/packages/common/nbstore/src/impls/cloud/doc.ts b/packages/common/nbstore/src/impls/cloud/doc.ts index 6dd855ae521fa..7be0d02cb289a 100644 --- a/packages/common/nbstore/src/impls/cloud/doc.ts +++ b/packages/common/nbstore/src/impls/cloud/doc.ts @@ -24,29 +24,38 @@ export class CloudDocStorage extends DocStorage { new SocketConnection(this.peer, this.options.socketOptions) ); + private disposeConnectionStatusListener?: () => void; + private get socket() { return this.connection.inner; } - override async connect(): Promise { - await super.connect(); - this.connection.onStatusChanged(status => { - if (status === 'connected') { - this.join().catch(err => { - console.error('doc storage join failed', err); - }); - this.socket.on('space:broadcast-doc-update', this.onServerUpdate); - } - }); + override connect() { + if (!this.disposeConnectionStatusListener) { + this.disposeConnectionStatusListener = this.connection.onStatusChanged( + status => { + if (status === 'connected') { + this.join().catch(err => { + console.error('doc storage join failed', err); + }); + this.socket.on('space:broadcast-doc-update', this.onServerUpdate); + } + } + ); + } + super.connect(); } - override async disconnect(): Promise { + override disconnect() { + if (this.disposeConnectionStatusListener) { + this.disposeConnectionStatusListener(); + } this.socket.emit('space:leave', { spaceType: this.spaceType, spaceId: this.spaceId, }); this.socket.off('space:broadcast-doc-update', this.onServerUpdate); - await super.connect(); + super.disconnect(); } async join() { diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts index de80a479912b1..79d31057ce2bd 100644 --- a/packages/common/nbstore/src/impls/cloud/socket.ts +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -184,7 +184,7 @@ export class SocketConnection extends Connection { return conn; } - override async doDisconnect(conn: Socket) { + override doDisconnect(conn: Socket) { conn.close(); } diff --git a/packages/common/nbstore/src/impls/idb/db.ts b/packages/common/nbstore/src/impls/idb/db.ts index 60129fe1515b5..52d3d088c8932 100644 --- a/packages/common/nbstore/src/impls/idb/db.ts +++ b/packages/common/nbstore/src/impls/idb/db.ts @@ -25,7 +25,8 @@ export class IDBConnection extends Connection<{ blocking: () => { // if, for example, an tab with newer version is opened, this function will be called. // we should close current connection to allow the new version to upgrade the db. - this.close( + this.setStatus( + 'closed', new Error('Blocking a new version. Closing the connection.') ); }, @@ -38,13 +39,11 @@ export class IDBConnection extends Connection<{ }; } - override async doDisconnect() { - this.close(); - } - - private close(error?: Error) { - this.maybeConnection?.channel.close(); - this.maybeConnection?.db.close(); - this.setStatus('closed', error); + override doDisconnect(db: { + db: IDBPDatabase; + channel: BroadcastChannel; + }) { + db.channel.close(); + db.db.close(); } } diff --git a/packages/common/nbstore/src/impls/idb/doc.ts b/packages/common/nbstore/src/impls/idb/doc.ts index 8451c831d941f..b188b5b84b82b 100644 --- a/packages/common/nbstore/src/impls/idb/doc.ts +++ b/packages/common/nbstore/src/impls/idb/doc.ts @@ -1,4 +1,3 @@ -import { share } from '../../connection'; import { type DocClock, type DocClocks, @@ -17,7 +16,7 @@ interface ChannelMessage { } export class IndexedDBDocStorage extends DocStorage { - readonly connection = share(new IDBConnection(this.options)); + readonly connection = new IDBConnection(this.options); get db() { return this.connection.inner.db; diff --git a/packages/common/nbstore/src/impls/idb/v1/db.ts b/packages/common/nbstore/src/impls/idb/v1/db.ts index 6497e1b6992e2..b12dbf6d0461c 100644 --- a/packages/common/nbstore/src/impls/idb/v1/db.ts +++ b/packages/common/nbstore/src/impls/idb/v1/db.ts @@ -28,7 +28,7 @@ export class DocIDBConnection extends Connection> { }); } - override async doDisconnect(conn: IDBPDatabase) { + override doDisconnect(conn: IDBPDatabase) { conn.close(); } } @@ -57,7 +57,7 @@ export class BlobIDBConnection extends Connection> { }); } - override async doDisconnect(conn: IDBPDatabase) { + override doDisconnect(conn: IDBPDatabase) { conn.close(); } } diff --git a/packages/common/nbstore/src/impls/sqlite/db.ts b/packages/common/nbstore/src/impls/sqlite/db.ts index f1f4d4fc15889..e7f94a1e891fe 100644 --- a/packages/common/nbstore/src/impls/sqlite/db.ts +++ b/packages/common/nbstore/src/impls/sqlite/db.ts @@ -1,6 +1,6 @@ -import { apis, events } from '@affine/electron-api'; +import { apis } from '@affine/electron-api'; -import { Connection, type ConnectionStatus } from '../../connection'; +import { Connection } from '../../connection'; import { type SpaceType, universalId } from '../../storage'; type NativeDBApis = NonNullable['nbstore'] extends infer APIs @@ -27,7 +27,6 @@ export class NativeDBConnection extends Connection { } this.apis = this.bindApis(apis.nbstore); - this.listenToConnectionEvents(); } override get shareId(): string { @@ -63,21 +62,9 @@ export class NativeDBConnection extends Connection { await this.apis.connect(); } - override async doDisconnect() { - await this.apis.close(); - } - - private listenToConnectionEvents() { - events?.nbstore.onConnectionStatusChanged( - ({ peer, spaceType, spaceId, status, error }) => { - if ( - peer === this.peer && - spaceType === this.type && - spaceId === this.id - ) { - this.setStatus(status as ConnectionStatus, error); - } - } - ); + override doDisconnect() { + this.apis.close().catch(err => { + console.error('NativeDBConnection close failed', err); + }); } } diff --git a/packages/common/nbstore/src/op/consumer.ts b/packages/common/nbstore/src/op/consumer.ts index 67b45ff901fd0..812af778ed48a 100644 --- a/packages/common/nbstore/src/op/consumer.ts +++ b/packages/common/nbstore/src/op/consumer.ts @@ -42,15 +42,6 @@ export class SpaceStorageConsumer extends SpaceStorage { }); this.consumer.register('connect', this.connect.bind(this)); this.consumer.register('disconnect', this.disconnect.bind(this)); - this.consumer.register('connection', () => { - return new Observable(subscriber => { - subscriber.add( - this.on('connection', payload => { - subscriber.next(payload); - }) - ); - }); - }); this.consumer.register('destroy', this.destroy.bind(this)); } diff --git a/packages/common/nbstore/src/storage/index.ts b/packages/common/nbstore/src/storage/index.ts index 7d7447627314b..46da1f065c99b 100644 --- a/packages/common/nbstore/src/storage/index.ts +++ b/packages/common/nbstore/src/storage/index.ts @@ -1,6 +1,5 @@ import EventEmitter2 from 'eventemitter2'; -import type { ConnectionStatus } from '../connection'; import type { AwarenessStorage } from './awareness'; import type { BlobStorage } from './blob'; import type { DocStorage } from './doc'; @@ -39,59 +38,27 @@ export class SpaceStorage { return storage as Extract; } - async connect() { - await Promise.allSettled( - Array.from(this.storages.values()).map(async storage => { - // FIXME: multiple calls will register multiple listeners - this.disposables.add( - storage.connection.onStatusChanged((status, error) => { - this.event.emit('connection', { - storage: storage.storageType, - status, - error, - }); - }) - ); - await storage.connect(); - }) - ); + connect() { + Array.from(this.storages.values()).forEach(storage => { + storage.connect(); + }); } - async disconnect() { - await Promise.allSettled( - Array.from(this.storages.values()).map(async storage => { - await storage.disconnect(); - }) - ); + disconnect() { + Array.from(this.storages.values()).forEach(storage => { + storage.disconnect(); + }); } - on( - event: 'connection', - cb: (payload: { - storage: StorageType; - status: ConnectionStatus; - error?: Error; - }) => void - ): () => void { - this.event.on(event, cb); - return () => { - this.event.off(event, cb); - }; - } - - off( - event: 'connection', - cb: (payload: { - storage: StorageType; - status: ConnectionStatus; - error?: Error; - }) => void - ): void { - this.event.off(event, cb); + async waitForConnected() { + await Promise.all( + Array.from(this.storages.values()).map(storage => + storage.waitForConnected() + ) + ); } async destroy() { - await this.disconnect(); this.disposables.forEach(disposable => disposable()); this.event.removeAllListeners(); this.storages.clear(); diff --git a/packages/common/nbstore/src/storage/storage.ts b/packages/common/nbstore/src/storage/storage.ts index be37afcbdd4fa..b1c3ac7e27ed4 100644 --- a/packages/common/nbstore/src/storage/storage.ts +++ b/packages/common/nbstore/src/storage/storage.ts @@ -102,11 +102,15 @@ export abstract class Storage { constructor(public readonly options: Opts) {} - async connect() { - await this.connection.connect(); + connect() { + this.connection.connect(); } - async disconnect() { - await this.connection.disconnect(); + disconnect() { + this.connection.disconnect(); + } + + async waitForConnected() { + await this.connection.waitForConnected(); } } diff --git a/packages/frontend/apps/electron/src/helper/exposed.ts b/packages/frontend/apps/electron/src/helper/exposed.ts index effa818c29c51..9fa9ad05ff9ab 100644 --- a/packages/frontend/apps/electron/src/helper/exposed.ts +++ b/packages/frontend/apps/electron/src/helper/exposed.ts @@ -1,10 +1,5 @@ import { dialogHandlers } from './dialog'; -import { - dbEventsV1, - dbHandlersV1, - nbstoreEvents, - nbstoreHandlers, -} from './nbstore'; +import { dbEventsV1, dbHandlersV1, nbstoreHandlers } from './nbstore'; import { provideExposed } from './provide'; import { workspaceEvents, workspaceHandlers } from './workspace'; @@ -18,7 +13,6 @@ export const handlers = { export const events = { db: dbEventsV1, workspace: workspaceEvents, - nbstore: nbstoreEvents, }; const getExposedMeta = () => { diff --git a/packages/frontend/apps/electron/src/helper/nbstore/db.ts b/packages/frontend/apps/electron/src/helper/nbstore/db.ts index 5fd20214f7791..af03271a746f7 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/db.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/db.ts @@ -33,8 +33,14 @@ export class NativeDBConnection extends Connection { return conn; } - override async doDisconnect(conn: NativeDocStorage) { - await conn.close(); - logger.info('[nbstore] connection closed', this.shareId); + override doDisconnect(conn: NativeDocStorage) { + conn + .close() + .then(() => { + logger.info('[nbstore] connection closed', this.shareId); + }) + .catch(err => { + logger.error('[nbstore] connection close failed', this.shareId, err); + }); } } diff --git a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts index f05432b6b7ef6..946cb79cf5041 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/handlers.ts @@ -4,13 +4,7 @@ import { type DocUpdate, } from '@affine/nbstore'; -import type { MainEventRegister } from '../type'; -import { - type ConnectionStatus, - ensureStorage, - getStorage, - onConnectionChanged, -} from './storage'; +import { ensureStorage, getStorage } from './storage'; export const nbstoreHandlers = { connect: async (id: string) => { @@ -21,7 +15,7 @@ export const nbstoreHandlers = { const store = getStorage(id); if (store) { - await store.disconnect(); + store.disconnect(); // The store may be shared with other tabs, so we don't delete it from cache // the underlying connection will handle the close correctly // STORE_CACHE.delete(`${spaceType}:${spaceId}`); @@ -132,12 +126,3 @@ export const nbstoreHandlers = { return store.get('sync').clearClocks(); }, }; - -export const nbstoreEvents = { - onConnectionStatusChanged: (fn: (payload: ConnectionStatus) => void) => { - const sub = onConnectionChanged(fn); - return () => { - sub.unsubscribe(); - }; - }, -} satisfies Record; diff --git a/packages/frontend/apps/electron/src/helper/nbstore/index.ts b/packages/frontend/apps/electron/src/helper/nbstore/index.ts index 1bdd0b736ddc6..860899542964f 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/index.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/index.ts @@ -1,4 +1,4 @@ -export { nbstoreEvents, nbstoreHandlers } from './handlers'; +export { nbstoreHandlers } from './handlers'; export * from './storage'; export { dbEvents as dbEventsV1, dbHandlers as dbHandlersV1 } from './v1'; export { universalId } from '@affine/nbstore'; diff --git a/packages/frontend/apps/electron/src/helper/nbstore/storage.ts b/packages/frontend/apps/electron/src/helper/nbstore/storage.ts index dbec1d1098257..4e4f2b222fbe5 100644 --- a/packages/frontend/apps/electron/src/helper/nbstore/storage.ts +++ b/packages/frontend/apps/electron/src/helper/nbstore/storage.ts @@ -1,10 +1,4 @@ -import { - parseUniversalId, - SpaceStorage, - type SpaceType, - type StorageType, -} from '@affine/nbstore'; -import { Subject } from 'rxjs'; +import { parseUniversalId, SpaceStorage } from '@affine/nbstore'; import { applyUpdate, Doc as YDoc } from 'yjs'; import { logger } from '../logger'; @@ -57,18 +51,8 @@ export class SqliteSpaceStorage extends SpaceStorage { } const STORE_CACHE = new Map(); -export interface ConnectionStatus { - peer: string; - spaceType: SpaceType; - spaceId: string; - storage: StorageType; - status: string; - error?: Error; -} -const CONNECTION$ = new Subject(); process.on('beforeExit', () => { - CONNECTION$.complete(); STORE_CACHE.forEach(store => { store.destroy().catch(err => { logger.error('[nbstore] destroy store failed', err); @@ -76,10 +60,6 @@ process.on('beforeExit', () => { }); }); -export function onConnectionChanged(fn: (payload: ConnectionStatus) => void) { - return CONNECTION$.subscribe({ next: fn }); -} - export function getStorage(universalId: string) { return STORE_CACHE.get(universalId); } @@ -101,24 +81,9 @@ export async function ensureStorage(universalId: string) { new SqliteSyncStorage(opts), ]); - store.on('connection', ({ storage, status, error }) => { - CONNECTION$.next({ - peer, - spaceType: type, - spaceId: id, - storage, - status, - error, - }); - logger.info( - `[nbstore] status changed: ${status}, spaceType: ${type}, spaceId: ${id}, storage: ${storage}` - ); - if (error) { - logger.error(`[nbstore] connection error: ${error}`); - } - }); + store.connect(); - await store.connect(); + await store.waitForConnected(); STORE_CACHE.set(universalId, store); }