Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 27, 2024
1 parent a1a6fd1 commit 7cad7ab
Show file tree
Hide file tree
Showing 33 changed files with 575 additions and 287 deletions.
55 changes: 53 additions & 2 deletions packages/common/nbstore/src/frontend/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ export type DocFrontendDocState = {
* the doc is syncing with remote peers
*/
syncing: boolean;
/**
* the doc is synced with remote peers
*/
synced: boolean;
/**
* the doc is retrying to sync with remote peers
*/
Expand Down Expand Up @@ -146,6 +150,7 @@ export class DocFrontend {
return combineLatest([frontendState$, syncState$]).pipe(
map(([frontend, sync]) => ({
...frontend,
synced: sync?.synced ?? false,
syncing: sync?.syncing ?? false,
syncRetrying: sync?.retrying ?? false,
syncErrorMessage: sync?.errorMessage ?? null,
Expand Down Expand Up @@ -443,7 +448,53 @@ export class DocFrontend {
}
});
}),
new Promise((_, reject) => {
new Promise<void>((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]).finally(() => {
sub?.unsubscribe();
});
}

async waitForDocLoaded(docId: string, abort?: AbortSignal) {
let sub: Subscription | undefined = undefined;
return Promise.race([
new Promise<void>(resolve => {
sub = this.docState$(docId).subscribe(state => {
if (state.loaded) {
resolve();
}
});
}),
new Promise<void>((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]).finally(() => {
sub?.unsubscribe();
});
}

async waitForDocSynced(docId: string, abort?: AbortSignal) {
let sub: Subscription | undefined = undefined;
return Promise.race([
new Promise<void>(resolve => {
sub = this.docState$(docId).subscribe(state => {
if (state.syncing) {
resolve();
}
});
}),
new Promise<void>((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
Expand All @@ -466,7 +517,7 @@ export class DocFrontend {
}
});
}),
new Promise((_, reject) => {
new Promise<void>((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
Expand Down
8 changes: 6 additions & 2 deletions packages/common/nbstore/src/impls/cloud/awareness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import {
} from './socket';

interface CloudAwarenessStorageOptions extends AwarenessStorageOptions {
socketOptions: SocketOptions;
socketOptions?: SocketOptions;
serverBaseUrl: string;
}

export class CloudAwarenessStorage extends AwarenessStorageBase<CloudAwarenessStorageOptions> {
static readonly identifier = 'CloudAwarenessStorage';

connection = share(
new SocketConnection(this.peer, this.options.socketOptions)
new SocketConnection(
`${this.options.serverBaseUrl}/`,
this.options.socketOptions
)
);

private get socket() {
Expand Down
48 changes: 22 additions & 26 deletions packages/common/nbstore/src/impls/cloud/blob.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,29 @@
import {
deleteBlobMutation,
gqlFetcherFactory,
listBlobsQuery,
releaseDeletedBlobsMutation,
setBlobMutation,
} from '@affine/graphql';

import { DummyConnection } from '../../connection';
import {
type BlobRecord,
BlobStorageBase,
type BlobStorageOptions,
} from '../../storage';
import { HttpConnection } from './http';

interface CloudBlobStorageOptions extends BlobStorageOptions {
apiBaseUrl: string;
serverBaseUrl: string;
}

export class CloudBlobStorage extends BlobStorageBase<CloudBlobStorageOptions> {
static readonly identifier = 'CloudBlobStorage';

private readonly gql = gqlFetcherFactory(
this.options.apiBaseUrl + '/graphql'
);
override connection = new DummyConnection();
readonly connection = new HttpConnection(this.options.serverBaseUrl);

override async get(key: string) {
const res = await fetch(
this.options.apiBaseUrl +
'/api/workspaces/' +
this.spaceId +
'/blobs/' +
key,
const res = await this.connection.fetch(
'/api/workspaces/' + this.spaceId + '/blobs/' + key,
{
cache: 'default',
headers: {
Expand All @@ -40,23 +32,27 @@ export class CloudBlobStorage extends BlobStorageBase<CloudBlobStorageOptions> {
}
);

if (!res.ok) {
if (res.status === 404) {
return null;
}

const data = await res.arrayBuffer();
try {
const blob = await res.blob();

return {
key,
data: new Uint8Array(data),
mime: res.headers.get('content-type') || '',
size: data.byteLength,
createdAt: new Date(res.headers.get('last-modified') || Date.now()),
};
return {
key,
data: new Uint8Array(await blob.arrayBuffer()),
mime: blob.type,
size: blob.size,
createdAt: new Date(res.headers.get('last-modified') || Date.now()),
};
} catch (err) {
throw new Error('blob download error: ' + err);
}
}

override async set(blob: BlobRecord) {
await this.gql({
await this.connection.gql({
query: setBlobMutation,
variables: {
workspaceId: this.spaceId,
Expand All @@ -66,21 +62,21 @@ export class CloudBlobStorage extends BlobStorageBase<CloudBlobStorageOptions> {
}

override async delete(key: string, permanently: boolean) {
await this.gql({
await this.connection.gql({
query: deleteBlobMutation,
variables: { workspaceId: this.spaceId, key, permanently },
});
}

override async release() {
await this.gql({
await this.connection.gql({
query: releaseDeletedBlobsMutation,
variables: { workspaceId: this.spaceId },
});
}

override async list() {
const res = await this.gql({
const res = await this.connection.gql({
query: listBlobsQuery,
variables: { workspaceId: this.spaceId },
});
Expand Down
76 changes: 76 additions & 0 deletions packages/common/nbstore/src/impls/cloud/doc-static.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import {
type DocClock,
type DocClocks,
type DocRecord,
DocStorageBase,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
import { HttpConnection } from './http';

interface CloudDocStorageOptions extends DocStorageOptions {
serverBaseUrl: string;
}

export class StaticCloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
override connection = new HttpConnection(this.options.serverBaseUrl);
override async pushDocUpdate(
update: DocUpdate,
_origin?: string
): Promise<DocClock> {
// http is readonly
return { docId: update.docId, timestamp: new Date() };
}
override async getDocTimestamp(docId: string): Promise<DocClock | null> {
// http doesn't support this, so we just return a new timestamp
return {
docId,
timestamp: new Date(),
};
}
override async getDocTimestamps(): Promise<DocClocks> {
// http doesn't support this
return {};
}
override deleteDoc(_docId: string): Promise<void> {
// http is readonly
return Promise.resolve();
}
protected override async getDocSnapshot(
docId: string
): Promise<DocRecord | null> {
const arrayBuffer = await this.connection.fetchArrayBuffer(
`/api/workspaces/${this.spaceId}/docs/${docId}`,
{
priority: 'high',
headers: {
Accept: 'application/octet-stream', // this is necessary for ios native fetch to return arraybuffer
},
}
);
if (!arrayBuffer) {
return null;
}
return {
docId: docId,
bin: new Uint8Array(arrayBuffer),
timestamp: new Date(),
};
}
protected override setDocSnapshot(
_snapshot: DocRecord,
_prevSnapshot: DocRecord | null
): Promise<boolean> {
// http is readonly
return Promise.resolve(false);
}
protected override getDocUpdates(_docId: string): Promise<DocRecord[]> {
return Promise.resolve([]);
}
protected override markUpdatesMerged(
_docId: string,
_updates: DocRecord[]
): Promise<number> {
return Promise.resolve(0);
}
}
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/impls/cloud/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
} from './socket';

interface CloudDocStorageOptions extends DocStorageOptions {
socketOptions: SocketOptions;
socketOptions?: SocketOptions;
serverBaseUrl: string;
}

Expand Down
69 changes: 69 additions & 0 deletions packages/common/nbstore/src/impls/cloud/http.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { gqlFetcherFactory } from '@affine/graphql';

import { DummyConnection } from '../../connection';

export class HttpConnection extends DummyConnection {
readonly fetch = async (input: string, init?: RequestInit) => {
const externalSignal = init?.signal;
if (externalSignal?.aborted) {
throw externalSignal.reason;
}
const abortController = new AbortController();
externalSignal?.addEventListener('abort', reason => {
abortController.abort(reason);
});

const timeout = 15000;
const timeoutId = setTimeout(() => {
abortController.abort('timeout');
}, timeout);

const res = await globalThis
.fetch(new URL(input, this.serverBaseUrl), {
...init,
signal: abortController.signal,
headers: {
...init?.headers,
'x-affine-version': BUILD_CONFIG.appVersion,
},
})
.catch(err => {
throw new Error('fetch error: ' + err);
});
clearTimeout(timeoutId);
if (!res.ok && res.status !== 404) {
let reason: string | any = '';
if (res.headers.get('Content-Type')?.includes('application/json')) {
try {
reason = await res.json();
} catch {
// ignore
}
}
throw new Error('fetch error status: ' + res.status + ' ' + reason);
}
return res;
};

readonly fetchArrayBuffer = async (input: string, init?: RequestInit) => {
const res = await this.fetch(input, init);
if (res.status === 404) {
// 404
return null;
}
try {
return await res.arrayBuffer();
} catch (err) {
throw new Error('fetch download error: ' + err);
}
};

readonly gql = gqlFetcherFactory(
new URL('/graphql', this.serverBaseUrl).href,
this.fetch
);

constructor(private readonly serverBaseUrl: string) {
super();
}
}
1 change: 1 addition & 0 deletions packages/common/nbstore/src/impls/cloud/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './awareness';
export * from './blob';
export * from './doc';
export * from './doc-static';
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/impls/cloud/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ export class SocketConnection extends AutoReconnectConnection<Socket> {

constructor(
private readonly endpoint: string,
private readonly socketOptions: SocketOptions
private readonly socketOptions?: SocketOptions
) {
super();
}
Expand Down
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/sync/doc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface DocSyncState {
}

export interface DocSyncDocState {
synced: boolean;
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
Expand Down Expand Up @@ -56,6 +57,7 @@ export class DocSyncImpl implements DocSync {
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
retrying: allPeers.some(peer => peer.retrying),
syncing: allPeers.some(peer => peer.syncing),
synced: allPeers.every(peer => peer.synced),
}))
);
}
Expand Down
Loading

0 comments on commit 7cad7ab

Please sign in to comment.