Skip to content

Commit

Permalink
refactor(doc-storage): operation pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 29, 2024
1 parent b160b2b commit 8042850
Show file tree
Hide file tree
Showing 33 changed files with 1,293 additions and 783 deletions.
58 changes: 58 additions & 0 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,23 @@ interface LeaveSpaceAwarenessMessage {
docId: string;
}

/**
* @deprecated
*/
interface PushDocUpdatesMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
updates: string[];
}

interface PushDocUpdateMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
update: string;
}

interface LoadDocMessage {
spaceType: SpaceType;
spaceId: string;
Expand Down Expand Up @@ -237,6 +247,9 @@ export class SpaceSyncGateway
};
}

/**
* @deprecated use [space:push-doc-update] instead, client should always merge updates on their own
*/
@SubscribeMessage('space:push-doc-updates')
async onReceiveDocUpdates(
@ConnectedSocket() client: Socket,
Expand Down Expand Up @@ -281,6 +294,51 @@ export class SpaceSyncGateway
};
}

@SubscribeMessage('space:push-doc-update')
async onReceiveDocUpdate(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
message: PushDocUpdateMessage
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
const { spaceType, spaceId, docId, update } = message;
const adapter = this.selectAdapter(client, spaceType);

// TODO(@forehalo): we might need to check write permission before push updates
const timestamp = await adapter.push(
spaceId,
docId,
[Buffer.from(update, 'base64')],
user.id
);

// TODO(@forehalo): separate different version of clients into different rooms,
// so the clients won't receive useless updates events
client.to(adapter.room(spaceId)).emit('space:broadcast-doc-updates', {
spaceType,
spaceId,
docId,
updates: [update],
timestamp,
});

client.to(adapter.room(spaceId)).emit('space:broadcast-doc-update', {
spaceType,
spaceId,
docId,
update,
timestamp,
editor: user.id,
});

return {
data: {
accepted: true,
timestamp,
},
};
}

@SubscribeMessage('space:load-doc-timestamps')
async onLoadDocTimestamps(
@ConnectedSocket() client: Socket,
Expand Down
97 changes: 0 additions & 97 deletions packages/common/doc-storage/README.md

This file was deleted.

5 changes: 2 additions & 3 deletions packages/common/doc-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
"private": true,
"sideEffects": false,
"exports": {
".": "./index.ts",
"./impls/*": "./impls/*",
"./storage": "./storage/index.ts"
".": "./index.ts"
},
"dependencies": {
"@affine/graphql": "workspace:*",
"@affine/native": "workspace:*",
"eventemitter2": "^6.4.9",
"idb": "^8.0.0",
"lodash-es": "^4.17.21",
"socket.io-client": "^4.7.5",
Expand Down
79 changes: 79 additions & 0 deletions packages/common/doc-storage/src/client/backend.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {
ConnectOp,
DisconnectOp,
OpConsumer,
type OpHandler,
type OpSubscribableHandler,
SubscribeConnectionStatusOp,
} from '../op';
import type { Storage } from '../storage';

export class PeerStorageBackend extends OpConsumer {
private storages: Storage[] = [];
private readonly storageOpts: { impl: string; opts: any }[] = [];

constructor(port: MessagePort) {
super(port);
this.register(ConnectOp, this.connect);
this.register(DisconnectOp, this.disconnect);
this.registerSubscribable(
SubscribeConnectionStatusOp,
this.onStatusChanged
);
}

addBackendStorage<T extends new (opts: any) => Storage>(
impl: T,
opts: ConstructorParameters<T>[0]
) {
this.storageOpts.push({ impl: impl.name, opts });
}

connect: OpHandler<ConnectOp> = async () => {
await Promise.all(
this.storageOpts.map(async _impl => {
// const storage = new StorageImplementations[impl.impl](impl.opts);
// await storage.connect();
// storage.register(this.port);
// this.storages[impl.type] = storage;
})
);
};

disconnect: OpHandler<DisconnectOp> = async () => {
await Promise.all(
Object.values(this.storages).map(async storage => {
await storage.disconnect();
})
);
this.storages = [];
};

onStatusChanged: OpSubscribableHandler<SubscribeConnectionStatusOp> = (
_,
callback
) => {
callback(/* { status: 'connected' } */);

return () => {};
};
}

export class PeerWorkerStorageBackend extends PeerStorageBackend {
// override async connect() {
// const worker = await getAndInitWorkerInSomewhere();
// the worker should proxy all 'op' messages to it's true backend
// worker.postMessage(
// {
// type: 'create-storage-worker-backend',
// storages: this.opts.storages,
// port: this.port,
// },
// [
// // transfer ownership of consumer port to worker,
// // this port is no longer usable in main thread
// this.port,
// ]
// );
// }
}
55 changes: 55 additions & 0 deletions packages/common/doc-storage/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {
ConnectOp,
DisconnectOp,
OpProducer,
SubscribeConnectionStatusOp,
} from '../op';
import { PeerStorageBackend, PeerWorkerStorageBackend } from './backend';

class PeerStorageClient extends OpProducer {
constructor(
port: MessagePort,
protected readonly backend: PeerStorageBackend
) {
super(port);
}

addStorage = this.backend.addBackendStorage.bind(this.backend);

async connect() {
this.listen();
await this.send(new ConnectOp());
}

async disconnect() {
this.close();
await this.send(new DisconnectOp());
}

onConnectionStatusChanged() {
this.subscribe(new SubscribeConnectionStatusOp(), (/* storage */) => {});
}
}

export function createPeerStorageClient() {
const channel = new MessageChannel();
const producerPort = channel.port1;
const consumerPort = channel.port2;

const backend = new PeerStorageBackend(consumerPort);

const client = new PeerStorageClient(producerPort, backend);

return client;
}

export function createPeerWorkerStorageClient() {
const channel = new MessageChannel();
const producerPort = channel.port1;
const consumerPort = channel.port2;

const backend = new PeerWorkerStorageBackend(consumerPort);

const client = new PeerStorageClient(producerPort, backend);
return client;
}
Loading

0 comments on commit 8042850

Please sign in to comment.