Skip to content

Commit

Permalink
refactor(doc-storage): operation pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 28, 2024
1 parent b160b2b commit 5fff4e0
Show file tree
Hide file tree
Showing 19 changed files with 782 additions and 348 deletions.
97 changes: 0 additions & 97 deletions packages/common/doc-storage/README.md

This file was deleted.

5 changes: 2 additions & 3 deletions packages/common/doc-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
"private": true,
"sideEffects": false,
"exports": {
".": "./index.ts",
"./impls/*": "./impls/*",
"./storage": "./storage/index.ts"
".": "./index.ts"
},
"dependencies": {
"@affine/graphql": "workspace:*",
"@affine/native": "workspace:*",
"eventemitter2": "^6.4.9",
"idb": "^8.0.0",
"lodash-es": "^4.17.21",
"socket.io-client": "^4.7.5",
Expand Down
67 changes: 67 additions & 0 deletions packages/common/doc-storage/src/client/backend.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { OpConsumer } from '../op';

export interface PeerStorageOptions {
storages: Array<{
type: string; // 'doc' | 'history' | 'blob' | 'sync' | 'awareness'
impl: string;
opts: any;
}>;
}

interface PeerStorageBackendOptions extends PeerStorageOptions {
port: MessagePort;
}

export class PeerStorageBackend extends OpConsumer {
storages: Record<string, any> = {};
constructor(protected readonly opts: PeerStorageBackendOptions) {
super(opts.port);
this.register('connect', this.connect.bind(this));
}

async connect() {
await Promise.all(
this.opts.storages.map(async impl => {
const storage = new StorageImplementations[impl.impl](impl.opts);

Check failure on line 25 in packages/common/doc-storage/src/client/backend.ts

View workflow job for this annotation

GitHub Actions / Lint

Cannot find name 'StorageImplementations'.
await storage.connect();
storage.register(this.port);
this.storages[impl.type] = storage;
})
);
}

async disconnect() {
await Promise.all(
Object.values(this.storages).map(async storage => {
storage.unregister(this.port);
await storage.disconnect();
})
);
this.storages = {};
}
}

export class PeerWorkerStorageBackend extends PeerStorageBackend {
constructor(opts: PeerStorageBackendOptions) {
super(opts);
this.register('connect', this.connect.bind(this));
this.register('disconnect', this.disconnect.bind(this));
}

override async connect() {
// const worker = await getAndInitWorkerInSomewhere();
// the worker should proxy all 'op' messages to it's true backend
// worker.postMessage(
// {
// type: 'create-storage-worker-backend',
// storages: this.opts.storages,
// port: this.port,
// },
// [
// // transfer ownership of consumer port to worker,
// // this port is no longer usable in main thread
// this.port,
// ]
// );
}
}
52 changes: 52 additions & 0 deletions packages/common/doc-storage/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { ConnectOp, DisconnectOp, OpProducer } from '../op';
import {
PeerStorageBackend,
type PeerStorageOptions,
PeerWorkerStorageBackend,
} from './backend';

class PeerStorageClient extends OpProducer {
constructor(
port: MessagePort,
private readonly backend: PeerStorageBackend

Check failure on line 11 in packages/common/doc-storage/src/client/index.ts

View workflow job for this annotation

GitHub Actions / Lint

Property 'backend' is declared but its value is never read.
) {
super(port);
}

async connect() {
await this.send(new ConnectOp());
}

async disconnect() {
await this.send(new DisconnectOp());
}
}

export function createPeerStorageClient(opts: PeerStorageOptions) {
const channel = new MessageChannel();
const producerPort = channel.port1;
const consumerPort = channel.port2;

const backend = new PeerStorageBackend({
port: consumerPort,
storages: opts.storages,
});

const client = new PeerStorageClient(producerPort, backend);

return client;
}

export function createPeerWorkerStorageClient(opts: PeerStorageOptions) {
const channel = new MessageChannel();
const producerPort = channel.port1;
const consumerPort = channel.port2;

const backend = new PeerWorkerStorageBackend({
port: consumerPort,
storages: opts.storages,
});

const client = new PeerStorageClient(producerPort, backend);
return client;
}
1 change: 0 additions & 1 deletion packages/common/doc-storage/src/impls/cloud/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
type ListedBlobRecord,
} from '../../storage';

// TODO(@forehalo): websocket?
interface CloudBlobStorageOptions extends BlobStorageOptions {
endpoint: string;
}
Expand Down
16 changes: 0 additions & 16 deletions packages/common/doc-storage/src/impls/sqlite/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ export class SqliteDocStorage extends DocStorage<SqliteDocStorageOptions> {
return this.db.deleteDoc(docId);
}

override async deleteSpace(): Promise<void> {
await this.disconnect();
// rm this.dbPath
}

override async getSpaceDocTimestamps(
after?: number
): Promise<Record<string, number> | null> {
Expand Down Expand Up @@ -101,15 +96,4 @@ export class SqliteDocStorage extends DocStorage<SqliteDocStorageOptions> {
updates.map(update => new Date(update.timestamp))
);
}

override async listDocHistories() {
return [];
}
override async getDocHistory() {
return null;
}

protected override async createDocHistory(): Promise<boolean> {
return false;
}
}
64 changes: 1 addition & 63 deletions packages/common/doc-storage/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1 @@
import type {
BlobStorage,
DocStorage,
Storage,
StorageOptions,
SyncStorage,
} from './storage';
import { Connection } from './storage';

interface StorageTypes {
doc: DocStorage;
blob: BlobStorage;
sync: SyncStorage;
}

export abstract class StorageManager extends Connection {
protected storages: Record<string, Storage> = {};

constructor(public readonly options: StorageOptions) {
super();
}

override async connect() {
if (!this.connected) {
await this.doConnect();
await Promise.all(
Object.values(this.storages).map(storage => storage.connect())
);
this._connected = true;
}
}

override async disconnect() {
await Promise.all(
Object.values(this.storages).map(storage => storage.disconnect())
);
await this.doDisconnect();
this._connected = false;
}

get<Type extends keyof StorageTypes>(type: Type): StorageTypes[Type] {
const storage = this.storages[type];
if (!storage) {
throw new Error(
`Unregistered storage type requested.\nWant: ${type}\n.Registered: ${Object.keys(this.storages).join(', ')}.`
);
}

// @ts-expect-error we have typecheck on adding
return storage;
}

add<Type extends keyof StorageTypes>(
type: Type,
storage: StorageTypes[Type]
) {
this.storages[type] = storage;
}

remove<Type extends keyof StorageTypes>(type: Type) {
delete this.storages[type];
}
}
export * from './client';
Loading

0 comments on commit 5fff4e0

Please sign in to comment.