Skip to content

Commit

Permalink
feat(doc-storage): impl blob storages
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 17, 2024
1 parent 9a6c896 commit db66969
Show file tree
Hide file tree
Showing 12 changed files with 708 additions and 97 deletions.
140 changes: 140 additions & 0 deletions packages/common/doc-storage/src/impls/cloud/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import {
type Blob,
BlobStorage,
type DocStorageOptions,
type ListedBlob,
} from '../../storage';
import type { Socket } from './socket';

interface CloudBlobStorageOptions extends DocStorageOptions {
socket: Socket;
}

export class CloudBlobStorage extends BlobStorage<CloudBlobStorageOptions> {
get socket() {
return this.options.socket;
}

override async connect(): Promise<void> {
// the event will be polled, there is no need to wait for socket to be connected
await this.clientHandShake();
// this.socket.on('space:broadcast-blob-update', this.onServerUpdates);
}

private async clientHandShake() {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});

if ('error' in res) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(res.error.message);
}
}

override async disconnect(): Promise<void> {
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
// this.socket.off('space:broadcast-doc-updates', this.onServerUpdate);
}

// onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => {
// if (
// this.spaceType === message.spaceType &&
// this.spaceId === message.spaceId
// ) {
// // how do we deal with the data?
// }
// };

override async getBlob(key: string): Promise<Blob | null> {
const res = await this.socket.emitWithAck('space:get-blob', {
spaceType: this.spaceType,
spaceId: this.spaceId,
key,
});

if ('error' in res) {
// TODO: use [UserFriendlyError]
throw new Error(res.error.message);
}

return {
...res.data,
data: base64ToUint8Array(res.data.data),
};
}

override async setBlob(blob: Blob): Promise<void> {
this.socket.emit('space:set-blob', {
spaceType: this.spaceType,
spaceId: this.spaceId,
key: blob.key,
data: await uint8ArrayToBase64(blob.data),
mime: blob.mime,
});
}

override async deleteBlob(key: string, permanently: boolean): Promise<void> {
this.socket.emit('space:delete-blob', {
spaceType: this.spaceType,
spaceId: this.spaceId,
key,
permanently,
});
}

override async releaseBlobs(): Promise<void> {
this.socket.emit('space:release-blobs', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
}

override async listBlobs(): Promise<ListedBlob[]> {
const res = await this.socket.emitWithAck('space:list-blobs', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});

if ('error' in res) {
// TODO: use [UserFriendlyError]
throw new Error(res.error.message);
}

return res.data;
}
}

export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);

const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};

reader.readAsDataURL(blob);
});
}

export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
78 changes: 2 additions & 76 deletions packages/common/doc-storage/src/impls/cloud/doc.ts
Original file line number Diff line number Diff line change
@@ -1,85 +1,11 @@
import type { Socket } from 'socket.io-client';

import { DocStorage, type DocStorageOptions } from '../../storage';

// TODO(@forehalo): use [UserFriendlyError]
interface EventError {
name: string;
message: string;
}

type WebsocketResponse<T> =
| {
error: EventError;
}
| {
data: T;
};

interface ServerEvents {
'space:broadcast-doc-updates': {
spaceType: string;
spaceId: string;
docId: string;
updates: string[];
timestamp: number;
};
}

interface ClientEvents {
'space:join': [
{ spaceType: string; spaceId: string; clientVersion: string },
{ clientId: string },
];
'space:leave': { spaceType: string; spaceId: string };
'space:push-doc-updates': [
{ spaceType: string; spaceId: string; docId: string; updates: string[] },
{ timestamp: number },
];
'space:load-doc-timestamps': [
{
spaceType: string;
spaceId: string;
timestamp?: number;
},
Record<string, number>,
];
'space:load-doc': [
{
spaceType: string;
spaceId: string;
docId: string;
stateVector?: string;
},
{
missing: string;
state: string;
timestamp: number;
},
];
}

type ServerEventsMap = {
[Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void;
};
type ClientEventsMap = {
[Key in keyof ClientEvents]: ClientEvents[Key] extends Array<any>
? (
data: ClientEvents[Key][0],
ack: (res: WebsocketResponse<ClientEvents[Key][1]>) => void
) => void
: (data: ClientEvents[Key]) => void;
};
import type { ServerEventsMap, Socket } from './socket';

interface CloudDocStorageOptions extends DocStorageOptions {
socket: Socket<ServerEventsMap, ClientEventsMap>;
socket: Socket;
}

export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
constructor(options: CloudDocStorageOptions) {
super(options);
}

get name() {
// @ts-expect-error we need it
return this.options.socket.io.uri;
Expand Down
118 changes: 118 additions & 0 deletions packages/common/doc-storage/src/impls/cloud/socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import type { Socket as IO } from 'socket.io-client';

// TODO(@forehalo): use [UserFriendlyError]
interface EventError {
name: string;
message: string;
}

type WebsocketResponse<T> =
| {
error: EventError;
}
| {
data: T;
};

interface ServerEvents {
'space:broadcast-doc-updates': {
spaceType: string;
spaceId: string;
docId: string;
updates: string[];
timestamp: number;
};
'space:broadcast-blob-update': {
spaceType: string;
spaceId: string;
key: string;
data: string;
mime: string;
};
}

interface ClientEvents {
'space:join': [
{ spaceType: string; spaceId: string; clientVersion: string },
{ clientId: string },
];
'space:leave': { spaceType: string; spaceId: string };
'space:push-doc-updates': [
{ spaceType: string; spaceId: string; docId: string; updates: string[] },
{ timestamp: number },
];
'space:load-doc-timestamps': [
{
spaceType: string;
spaceId: string;
timestamp?: number;
},
Record<string, number>,
];
'space:load-doc': [
{
spaceType: string;
spaceId: string;
docId: string;
stateVector?: string;
},
{
missing: string;
state: string;
timestamp: number;
},
];

// blobs
'space:get-blob': [
{
spaceType: string;
spaceId: string;
key: string;
},
{
key: string;
data: string;
mime: string;
},
];
'space:set-blob': {
spaceType: string;
spaceId: string;
key: string;
data: string;
mime: string;
};
'space:delete-blob': {
spaceType: string;
spaceId: string;
key: string;
permanently: boolean;
};
'space:release-blobs': {
spaceType: string;
spaceId: string;
};
'space:list-blobs': [
{
spaceType: string;
spaceId: string;
},
{ key: string; size: number }[],
];
}

export type ServerEventsMap = {
[Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void;
};

export type ClientEventsMap = {
[Key in keyof ClientEvents]: ClientEvents[Key] extends Array<any>
? (
data: ClientEvents[Key][0],
ack: (res: WebsocketResponse<ClientEvents[Key][1]>) => void
) => void
: (data: ClientEvents[Key]) => void;
};

export type Socket = IO<ServerEventsMap, ClientEventsMap>;
Loading

0 comments on commit db66969

Please sign in to comment.