diff --git a/packages/backend/server/migrations/20241023065619_blobs/migration.sql b/packages/backend/server/migrations/20241023065619_blobs/migration.sql new file mode 100644 index 0000000000000..22fac2f05801b --- /dev/null +++ b/packages/backend/server/migrations/20241023065619_blobs/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable +CREATE TABLE "blobs" ( + "workspace_id" VARCHAR NOT NULL, + "key" VARCHAR NOT NULL, + "size" INTEGER NOT NULL, + "mime" VARCHAR NOT NULL, + "created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "deleted_at" TIMESTAMPTZ(3), + + CONSTRAINT "blobs_pkey" PRIMARY KEY ("workspace_id","key") +); + +-- AddForeignKey +ALTER TABLE "blobs" ADD CONSTRAINT "blobs_workspace_id_fkey" FOREIGN KEY ("workspace_id") REFERENCES "workspaces"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index acf2df7fa647d..9086965c81708 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -106,6 +106,7 @@ model Workspace { permissions WorkspaceUserPermission[] pagePermissions WorkspacePageUserPermission[] features WorkspaceFeature[] + blobs Blob[] @@map("workspaces") } @@ -335,7 +336,7 @@ model UserSubscription { // yearly/monthly/lifetime recurring String @db.VarChar(20) // onetime subscription or anything else - variant String? @db.VarChar(20) + variant String? @db.VarChar(20) // subscription.id, null for linefetime payment or one time payment subscription stripeSubscriptionId String? @unique @map("stripe_subscription_id") // subscription.status, active/past_due/canceled/unpaid... @@ -499,3 +500,20 @@ model RuntimeConfig { @@unique([module, key]) @@map("app_runtime_settings") } + +// Blob table only exists for fast non-data queries. +// like, total size of blobs in a workspace, or blob list for sync service. +// it should only be a map of metadata of blobs stored anywhere else +model Blob { + workspaceId String @map("workspace_id") @db.VarChar + key String @db.VarChar + size Int @db.Integer + mime String @db.VarChar + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) + deletedAt DateTime? @map("deleted_at") @db.Timestamptz(3) + + workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade) + + @@id([workspaceId, key]) + @@map("blobs") +} diff --git a/packages/backend/server/src/core/quota/resolver.ts b/packages/backend/server/src/core/quota/resolver.ts index c8afaa2237a01..309f43833ab6d 100644 --- a/packages/backend/server/src/core/quota/resolver.ts +++ b/packages/backend/server/src/core/quota/resolver.ts @@ -11,6 +11,7 @@ import { CurrentUser } from '../auth/session'; import { EarlyAccessType } from '../features'; import { UserType } from '../user'; import { QuotaService } from './service'; +import { QuotaManagementService } from './storage'; registerEnumType(EarlyAccessType, { name: 'EarlyAccessType', @@ -55,9 +56,18 @@ class UserQuotaType { humanReadable!: UserQuotaHumanReadableType; } +@ObjectType('UserQuotaUsage') +class UserQuotaUsageType { + @Field(() => SafeIntResolver, { name: 'storageQuota' }) + storageQuota!: number; +} + @Resolver(() => UserType) export class QuotaManagementResolver { - constructor(private readonly quota: QuotaService) {} + constructor( + private readonly quota: QuotaService, + private readonly management: QuotaManagementService + ) {} @ResolveField(() => UserQuotaType, { name: 'quota', nullable: true }) async getQuota(@CurrentUser() me: UserType) { @@ -65,4 +75,15 @@ export class QuotaManagementResolver { return quota.feature; } + + @ResolveField(() => UserQuotaUsageType, { name: 'quotaUsage' }) + async getQuotaUsage( + @CurrentUser() me: UserType + ): Promise { + const usage = await this.management.getUserStorageUsage(me.id); + + return { + storageQuota: usage, + }; + } } diff --git a/packages/backend/server/src/core/quota/storage.ts b/packages/backend/server/src/core/quota/storage.ts index ac77ace364f93..4e6910d635c50 100644 --- a/packages/backend/server/src/core/quota/storage.ts +++ b/packages/backend/server/src/core/quota/storage.ts @@ -40,7 +40,7 @@ export class QuotaManagementService { }; } - async getUserUsage(userId: string) { + async getUserStorageUsage(userId: string) { const workspaces = await this.permissions.getOwnedWorkspaces(userId); const sizes = await Promise.allSettled( @@ -88,7 +88,7 @@ export class QuotaManagementService { async getQuotaCalculator(userId: string) { const quota = await this.getUserQuota(userId); const { storageQuota, businessBlobLimit } = quota; - const usedSize = await this.getUserUsage(userId); + const usedSize = await this.getUserStorageUsage(userId); return this.generateQuotaCalculator( storageQuota, @@ -128,7 +128,7 @@ export class QuotaManagementService { }, } = await this.quota.getUserQuota(owner.id); // get all workspaces size of owner used - const usedSize = await this.getUserUsage(owner.id); + const usedSize = await this.getUserStorageUsage(owner.id); // relax restrictions if workspace has unlimited feature // todo(@darkskygit): need a mechanism to allow feature as a middleware to edit quota const unlimited = await this.feature.hasWorkspaceFeature( diff --git a/packages/backend/server/src/core/storage/wrappers/blob.ts b/packages/backend/server/src/core/storage/wrappers/blob.ts index 9aa991ab4b924..b6a80d31fe7cb 100644 --- a/packages/backend/server/src/core/storage/wrappers/blob.ts +++ b/packages/backend/server/src/core/storage/wrappers/blob.ts @@ -1,12 +1,12 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; import { - type BlobInputType, - Cache, Config, EventEmitter, type EventPayload, - type ListObjectsMetadata, + type GetObjectMetadata, + ListObjectsMetadata, OnEvent, type StorageProvider, StorageProviderFactory, @@ -14,62 +14,164 @@ import { @Injectable() export class WorkspaceBlobStorage { + private readonly logger = new Logger(WorkspaceBlobStorage.name); public readonly provider: StorageProvider; constructor( private readonly config: Config, private readonly event: EventEmitter, private readonly storageFactory: StorageProviderFactory, - private readonly cache: Cache + private readonly db: PrismaClient ) { this.provider = this.storageFactory.create(this.config.storages.blob); } - async put(workspaceId: string, key: string, blob: BlobInputType) { - await this.provider.put(`${workspaceId}/${key}`, blob); - await this.cache.delete(`blob-list:${workspaceId}`); + async put(workspaceId: string, key: string, blob: Buffer, mime: string) { + const meta: GetObjectMetadata = { + contentType: mime, + contentLength: blob.byteLength, + lastModified: new Date(), + }; + await this.provider.put(`${workspaceId}/${key}`, blob, meta); + this.trySyncBlobMeta(workspaceId, key, meta); } async get(workspaceId: string, key: string) { - return this.provider.get(`${workspaceId}/${key}`); + const blob = await this.provider.get(`${workspaceId}/${key}`); + this.trySyncBlobMeta(workspaceId, key, blob.metadata); + return blob; } async list(workspaceId: string) { - const cachedList = await this.cache.list( - `blob-list:${workspaceId}`, - 0, - -1 - ); - - if (cachedList.length > 0) { - return cachedList; + const blobsInDb = await this.db.blob.findMany({ + where: { + workspaceId, + deletedAt: null, + }, + }); + + if (blobsInDb.length > 0) { + return blobsInDb; } const blobs = await this.provider.list(workspaceId + '/'); + this.trySyncBlobsMeta(workspaceId, blobs); + + return blobs.map(blob => ({ + key: blob.key, + size: blob.contentLength, + createdAt: blob.lastModified, + mime: 'application/octet-stream', + })); + } - blobs.forEach(item => { - // trim workspace prefix - item.key = item.key.slice(workspaceId.length + 1); - }); - - await this.cache.pushBack(`blob-list:${workspaceId}`, ...blobs); - - return blobs; + async delete(workspaceId: string, key: string, permanently = false) { + if (permanently) { + await this.provider.delete(`${workspaceId}/${key}`); + await this.db.blob.deleteMany({ + where: { + workspaceId, + key, + }, + }); + } else { + await this.db.blob.update({ + where: { + workspaceId_key: { + workspaceId, + key, + }, + }, + data: { + deletedAt: new Date(), + }, + }); + } } - /** - * we won't really delete the blobs until the doc blobs manager is implemented sounded - */ - async delete(_workspaceId: string, _key: string) { - // return this.provider.delete(`${workspaceId}/${key}`); + async release(workspaceId: string) { + const deletedBlobs = await this.db.blob.findMany({ + where: { + workspaceId, + deletedAt: { + not: null, + }, + }, + }); + + deletedBlobs.forEach(blob => { + this.event.emit('workspace.blob.deleted', { + workspaceId: workspaceId, + key: blob.key, + }); + }); } async totalSize(workspaceId: string) { const blobs = await this.list(workspaceId); - // how could we ignore the ones get soft-deleted? return blobs.reduce((acc, item) => acc + item.size, 0); } + private trySyncBlobsMeta(workspaceId: string, blobs: ListObjectsMetadata[]) { + for (const blob of blobs) { + this.trySyncBlobMeta(workspaceId, blob.key, { + contentType: 'application/octet-stream', + ...blob, + }); + } + } + + private trySyncBlobMeta( + workspaceId: string, + key: string, + meta?: GetObjectMetadata + ) { + setImmediate(() => { + this.syncBlobMeta(workspaceId, key, meta).catch(() => { + /* never throw */ + }); + }); + } + + private async syncBlobMeta( + workspaceId: string, + key: string, + meta?: GetObjectMetadata + ) { + try { + if (meta) { + await this.db.blob.upsert({ + where: { + workspaceId_key: { + workspaceId, + key, + }, + }, + update: { + mime: meta.contentType, + size: meta.contentLength, + }, + create: { + workspaceId, + key, + mime: meta.contentType, + size: meta.contentLength, + }, + }); + } else { + await this.db.blob.deleteMany({ + where: { + workspaceId, + key, + }, + }); + } + } catch (e) { + // never throw + this.logger.error('failed to sync blob meta to DB', e); + } + } + @OnEvent('workspace.deleted') async onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) { const blobs = await this.list(workspaceId); @@ -78,7 +180,7 @@ export class WorkspaceBlobStorage { blobs.forEach(blob => { this.event.emit('workspace.blob.deleted', { workspaceId: workspaceId, - name: blob.key, + key: blob.key, }); }); } @@ -86,8 +188,14 @@ export class WorkspaceBlobStorage { @OnEvent('workspace.blob.deleted') async onDeleteWorkspaceBlob({ workspaceId, - name, + key, }: EventPayload<'workspace.blob.deleted'>) { - await this.delete(workspaceId, name); + await this.db.blob.deleteMany({ + where: { + workspaceId, + key, + }, + }); + await this.delete(workspaceId, key); } } diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index b922e477bb6a3..b6e8809287a12 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -113,6 +113,7 @@ interface UpdateAwarenessMessage { docId: string; awarenessUpdate: string; } + @WebSocketGateway() export class SpaceSyncGateway implements OnGatewayConnection, OnGatewayDisconnect @@ -181,26 +182,6 @@ export class SpaceSyncGateway } } - async joinWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.join(room); - } - - async leaveWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.leave(room); - } - - assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) { - if (!client.rooms.has(room)) { - throw new NotInSpace({ spaceId: room.split(':')[0] }); - } - } - // v3 @SubscribeMessage('space:join') async onJoinSpace( diff --git a/packages/backend/server/src/core/workspaces/resolvers/blob.ts b/packages/backend/server/src/core/workspaces/resolvers/blob.ts index 82c8f6c1ca986..7edef0f431b6e 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/blob.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/blob.ts @@ -1,29 +1,40 @@ import { Logger, UseGuards } from '@nestjs/common'; import { Args, + Field, Int, Mutation, + ObjectType, Parent, Query, ResolveField, Resolver, } from '@nestjs/graphql'; -import { SafeIntResolver } from 'graphql-scalars'; import GraphQLUpload from 'graphql-upload/GraphQLUpload.mjs'; import type { FileUpload } from '../../../fundamentals'; -import { - BlobQuotaExceeded, - CloudThrottlerGuard, - MakeCache, - PreventCache, -} from '../../../fundamentals'; +import { BlobQuotaExceeded, CloudThrottlerGuard } from '../../../fundamentals'; import { CurrentUser } from '../../auth'; import { Permission, PermissionService } from '../../permission'; import { QuotaManagementService } from '../../quota'; import { WorkspaceBlobStorage } from '../../storage'; import { WorkspaceBlobSizes, WorkspaceType } from '../types'; +@ObjectType() +class ListedBlob { + @Field() + key!: string; + + @Field() + mime!: string; + + @Field() + size!: number; + + @Field() + createdAt!: string; +} + @UseGuards(CloudThrottlerGuard) @Resolver(() => WorkspaceType) export class WorkspaceBlobResolver { @@ -34,7 +45,7 @@ export class WorkspaceBlobResolver { private readonly storage: WorkspaceBlobStorage ) {} - @ResolveField(() => [String], { + @ResolveField(() => [ListedBlob], { description: 'List blobs of workspace', complexity: 2, }) @@ -44,9 +55,7 @@ export class WorkspaceBlobResolver { ) { await this.permissions.checkWorkspace(workspace.id, user.id); - return this.storage - .list(workspace.id) - .then(list => list.map(item => item.key)); + return this.storage.list(workspace.id); } @ResolveField(() => Int, { @@ -64,7 +73,6 @@ export class WorkspaceBlobResolver { description: 'List blobs of workspace', deprecationReason: 'use `workspace.blobs` instead', }) - @MakeCache(['blobs'], ['workspaceId']) async listBlobs( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string @@ -76,42 +84,15 @@ export class WorkspaceBlobResolver { .then(list => list.map(item => item.key)); } - /** - * @deprecated use `user.storageUsage` instead - */ @Query(() => WorkspaceBlobSizes, { - deprecationReason: 'use `user.storageUsage` instead', + deprecationReason: 'use `user.quotaUsage` instead', }) async collectAllBlobSizes(@CurrentUser() user: CurrentUser) { - const size = await this.quota.getUserUsage(user.id); + const size = await this.quota.getUserStorageUsage(user.id); return { size }; } - /** - * @deprecated mutation `setBlob` will check blob limit & quota usage - */ - @Query(() => WorkspaceBlobSizes, { - deprecationReason: 'no more needed', - }) - async checkBlobSize( - @CurrentUser() user: CurrentUser, - @Args('workspaceId') workspaceId: string, - @Args('size', { type: () => SafeIntResolver }) blobSize: number - ) { - const canWrite = await this.permissions.tryCheckWorkspace( - workspaceId, - user.id, - Permission.Write - ); - if (canWrite) { - const size = await this.quota.checkBlobQuota(workspaceId, blobSize); - return { size }; - } - return false; - } - @Mutation(() => String) - @PreventCache(['blobs'], ['workspaceId']) async setBlob( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string, @@ -155,20 +136,44 @@ export class WorkspaceBlobResolver { }); }); - await this.storage.put(workspaceId, blob.filename, buffer); + await this.storage.put(workspaceId, blob.filename, buffer, blob.mimetype); return blob.filename; } @Mutation(() => Boolean) - @PreventCache(['blobs'], ['workspaceId']) async deleteBlob( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string, - @Args('hash') name: string + @Args('hash', { + type: () => String, + deprecationReason: 'use parameter [key]', + nullable: true, + }) + hash?: string, + @Args('key', { type: () => String, nullable: true }) key?: string, + @Args('permanently', { type: () => Boolean, defaultValue: false }) + permanently = false + ) { + key = key ?? hash; + if (!key) { + return false; + } + + await this.permissions.checkWorkspace(workspaceId, user.id); + + await this.storage.delete(workspaceId, key, permanently); + + return true; + } + + @Mutation(() => Boolean) + async releaseDeletedBlobs( + @CurrentUser() user: CurrentUser, + @Args('workspaceId') workspaceId: string ) { await this.permissions.checkWorkspace(workspaceId, user.id); - await this.storage.delete(workspaceId, name); + await this.storage.release(workspaceId); return true; } diff --git a/packages/backend/server/src/fundamentals/event/def.ts b/packages/backend/server/src/fundamentals/event/def.ts index 33db14b7a2832..c9cb42381350a 100644 --- a/packages/backend/server/src/fundamentals/event/def.ts +++ b/packages/backend/server/src/fundamentals/event/def.ts @@ -7,7 +7,7 @@ export interface WorkspaceEvents { blob: { deleted: Payload<{ workspaceId: Workspace['id']; - name: string; + key: string; }>; }; } diff --git a/packages/backend/server/src/fundamentals/storage/providers/fs.ts b/packages/backend/server/src/fundamentals/storage/providers/fs.ts index 282396c98d0f0..c271d93ece789 100644 --- a/packages/backend/server/src/fundamentals/storage/providers/fs.ts +++ b/packages/backend/server/src/fundamentals/storage/providers/fs.ts @@ -119,7 +119,7 @@ export class FsStorageProvider implements StorageProvider { results.push({ key: res, lastModified: stat.mtime, - size: stat.size, + contentLength: stat.size, }); } } diff --git a/packages/backend/server/src/fundamentals/storage/providers/provider.ts b/packages/backend/server/src/fundamentals/storage/providers/provider.ts index 46f8ef688db35..5b2bad24b2473 100644 --- a/packages/backend/server/src/fundamentals/storage/providers/provider.ts +++ b/packages/backend/server/src/fundamentals/storage/providers/provider.ts @@ -21,7 +21,7 @@ export interface PutObjectMetadata { export interface ListObjectsMetadata { key: string; lastModified: Date; - size: number; + contentLength: number; } export type BlobInputType = Buffer | Readable | string; diff --git a/packages/backend/server/src/fundamentals/storage/providers/utils.ts b/packages/backend/server/src/fundamentals/storage/providers/utils.ts index a0eab7d8c5451..1f9fc52ed6e6e 100644 --- a/packages/backend/server/src/fundamentals/storage/providers/utils.ts +++ b/packages/backend/server/src/fundamentals/storage/providers/utils.ts @@ -24,7 +24,7 @@ export async function autoMetadata( try { // length if (!metadata.contentLength) { - metadata.contentLength = blob.length; + metadata.contentLength = blob.byteLength; } // checksum diff --git a/packages/backend/server/src/plugins/storage/providers/s3.ts b/packages/backend/server/src/plugins/storage/providers/s3.ts index 3a242704dbf28..3aabf11f747d6 100644 --- a/packages/backend/server/src/plugins/storage/providers/s3.ts +++ b/packages/backend/server/src/plugins/storage/providers/s3.ts @@ -140,7 +140,7 @@ export class S3StorageProvider implements StorageProvider { listResult.Contents.map(r => ({ key: r.Key!, lastModified: r.LastModified!, - size: r.Size!, + contentLength: r.Size!, })) ); } diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index 88234d6839482..a43852f85d683 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -409,6 +409,13 @@ input ListUserInput { skip: Int = 0 } +type ListedBlob { + createdAt: String! + key: String! + mime: String! + size: Int! +} + input ManageUserInput { """User email""" email: String @@ -455,7 +462,7 @@ type Mutation { """Create a new workspace""" createWorkspace(init: Upload): WorkspaceType! deleteAccount: DeleteAccount! - deleteBlob(hash: String!, workspaceId: String!): Boolean! + deleteBlob(hash: String @deprecated(reason: "use parameter [key]"), key: String, permanently: Boolean! = false, workspaceId: String!): Boolean! """Delete a user account""" deleteUser(id: String!): DeleteAccount! @@ -467,6 +474,7 @@ type Mutation { leaveWorkspace(sendLeaveMail: Boolean, workspaceId: String!, workspaceName: String!): Boolean! publishPage(mode: PublicPageMode = Page, pageId: String!, workspaceId: String!): WorkspacePage! recoverDoc(guid: String!, timestamp: DateTime!, workspaceId: String!): DateTime! + releaseDeletedBlobs(workspaceId: String!): Boolean! """Remove user avatar""" removeAvatar: RemoveAvatar! @@ -539,8 +547,7 @@ enum PublicPageMode { } type Query { - checkBlobSize(size: SafeInt!, workspaceId: String!): WorkspaceBlobSizes! @deprecated(reason: "no more needed") - collectAllBlobSizes: WorkspaceBlobSizes! @deprecated(reason: "use `user.storageUsage` instead") + collectAllBlobSizes: WorkspaceBlobSizes! @deprecated(reason: "use `user.quotaUsage` instead") """Get current user""" currentUser: UserType @@ -823,6 +830,10 @@ type UserQuotaHumanReadable { storageQuota: String! } +type UserQuotaUsage { + storageQuota: SafeInt! +} + type UserSubscription { canceledAt: DateTime createdAt: DateTime! @@ -872,6 +883,7 @@ type UserType { """User name""" name: String! quota: UserQuota + quotaUsage: UserQuotaUsage! subscription(plan: SubscriptionPlan = Pro): UserSubscription @deprecated(reason: "use `UserType.subscriptions`") subscriptions: [UserSubscription!]! token: tokenType! @deprecated(reason: "use [/api/auth/sign-in?native=true] instead") @@ -905,7 +917,7 @@ type WorkspaceType { availableFeatures: [FeatureType!]! """List blobs of workspace""" - blobs: [String!]! + blobs: [ListedBlob!]! """Blobs size of workspace""" blobsSize: Int! diff --git a/packages/backend/server/tests/utils/blobs.ts b/packages/backend/server/tests/utils/blobs.ts index 514fbeee468d1..f6832c41c401d 100644 --- a/packages/backend/server/tests/utils/blobs.ts +++ b/packages/backend/server/tests/utils/blobs.ts @@ -54,35 +54,16 @@ export async function collectAllBlobSizes( .send({ query: ` query { - collectAllBlobSizes { - size + currentUser { + quotaUsage { + storageQuota + } } } `, }) .expect(200); - return res.body.data.collectAllBlobSizes.size; -} - -export async function checkBlobSize( - app: INestApplication, - token: string, - workspaceId: string, - size: number -): Promise { - const res = await request(app.getHttpServer()) - .post(gql) - .auth(token, { type: 'bearer' }) - .send({ - query: `query checkBlobSize($workspaceId: String!, $size: SafeInt!) { - checkBlobSize(workspaceId: $workspaceId, size: $size) { - size - } - }`, - variables: { workspaceId, size }, - }) - .expect(200); - return res.body.data.checkBlobSize.size; + return res.body.data.currentUser.quotaUsage.storageQuota; } export async function setBlob( diff --git a/packages/backend/server/tests/workspace-blobs.spec.ts b/packages/backend/server/tests/workspace/blobs.e2e.ts similarity index 76% rename from packages/backend/server/tests/workspace-blobs.spec.ts rename to packages/backend/server/tests/workspace/blobs.e2e.ts index 3e81d5fbf7aff..08dc6947fce4d 100644 --- a/packages/backend/server/tests/workspace-blobs.spec.ts +++ b/packages/backend/server/tests/workspace/blobs.e2e.ts @@ -2,11 +2,10 @@ import type { INestApplication } from '@nestjs/common'; import test from 'ava'; import request from 'supertest'; -import { AppModule } from '../src/app.module'; -import { FeatureManagementService, FeatureType } from '../src/core/features'; -import { QuotaService, QuotaType } from '../src/core/quota'; +import { AppModule } from '../../src/app.module'; +import { FeatureManagementService, FeatureType } from '../../src/core/features'; +import { QuotaService, QuotaType } from '../../src/core/quota'; import { - checkBlobSize, collectAllBlobSizes, createTestingApp, createWorkspace, @@ -14,7 +13,7 @@ import { listBlobs, setBlob, signUp, -} from './utils'; +} from '../utils'; const OneMB = 1024 * 1024; @@ -114,58 +113,6 @@ test('should calc all blobs size', async t => { const size = await collectAllBlobSizes(app, u1.token.token); t.is(size, 8, 'failed to collect all blob sizes'); - - const size1 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 10 * 1024 * 1024 * 1024 - 8 - ); - t.is(size1, 0, 'failed to check blob size'); - - const size2 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 10 * 1024 * 1024 * 1024 - 7 - ); - t.is(size2, -1, 'failed to check blob size'); -}); - -test('should be able calc quota after switch plan', async t => { - const u1 = await signUp(app, 'darksky', 'darksky@affine.pro', '1'); - - const workspace1 = await createWorkspace(app, u1.token.token); - - const buffer1 = Buffer.from([0, 0]); - await setBlob(app, u1.token.token, workspace1.id, buffer1); - const buffer2 = Buffer.from([0, 1]); - await setBlob(app, u1.token.token, workspace1.id, buffer2); - - const workspace2 = await createWorkspace(app, u1.token.token); - - const buffer3 = Buffer.from([0, 0]); - await setBlob(app, u1.token.token, workspace2.id, buffer3); - const buffer4 = Buffer.from([0, 1]); - await setBlob(app, u1.token.token, workspace2.id, buffer4); - - const size1 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 10 * 1024 * 1024 * 1024 - 8 - ); - t.is(size1, 0, 'failed to check free plan blob size'); - - await quota.switchUserQuota(u1.id, QuotaType.ProPlanV1); - - const size2 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 100 * 1024 * 1024 * 1024 - 8 - ); - t.is(size2, 0, 'failed to check pro plan blob size'); }); test('should reject blob exceeded limit', async t => { diff --git a/packages/common/doc-storage/README.md b/packages/common/doc-storage/README.md new file mode 100644 index 0000000000000..b1867d7243882 --- /dev/null +++ b/packages/common/doc-storage/README.md @@ -0,0 +1,97 @@ +# @affine/doc-storage + +## Storages + +### StorageNode + +```ts +import { StorageManager } from '@affine/doc-storage'; + +class CloudStorageManager extends StorageManager { + private readonly socket = io('http://endpoint'); + + constructor(options: StorageOptions) { + super(options); + this.add( + 'doc', + new CloudDocStorage({ + ...this.options, + socket: this.socket, + }) + ); + this.add( + 'blob', + new CloudBlobStorage({ + ...this.options, + socket: this.socket, + }) + ); + } + + override async doConnect() { + await this.socket.connect(); + } + + override async doDisconnect() { + await this.socket.close(); + } +} +``` + +### StorageEdgeNode + +```ts +interface SqliteStorageOptions extends StorageOptions { + dbPath: string; +} + +class SqliteStorage extends CloudStorageManager { + constructor(options: StorageOptions) { + super(options); + this.db = new Sqlite(this.options.dbPath); + + this.add('doc', new SqliteDocStorage({ ...this.options, db: this.db })); + this.add('blob', new SqliteBlobStorage({ ...this.options, db: this.db })); + this.add('sync', new SqliteSyncStorage({ ...this.options, db: this.db })); + } + + override async doConnect() { + await this.db.connect(); + } + + override async doDisconnect() { + await this.db.close(); + } +} +``` + +## Compose storages + +```ts +interface SqliteStorageOptions extends StorageOptions { + dbPath: string; +} + +class SqliteStorage extends CloudStorageManager { + idb!: SpaceIDB | null = null; + + constructor(options: StorageOptions) { + super(options); + this.db = new Sqlite(this.options.dbPath); + + this.add('doc', new SqliteDocStorage({ ...this.options, db: this.db })); + } + + override async doConnect() { + await this.db.connect(); + this.idb = await IDBProtocol.open(`${this.spaceType}:${this.spaceId}`); + this.add('blob', new IDBBlobStorage({ ...this.options, idb: this.idb })); + } + + override async doDisconnect() { + await this.db.close(); + await this.idb?.close(); + this.remove('blob'); + } +} +``` diff --git a/packages/common/doc-storage/package.json b/packages/common/doc-storage/package.json index 1a01a64f8805b..fe3ab520fb768 100644 --- a/packages/common/doc-storage/package.json +++ b/packages/common/doc-storage/package.json @@ -10,6 +10,7 @@ "./storage": "./storage/index.ts" }, "dependencies": { + "@affine/graphql": "workspace:*", "@affine/native": "workspace:*", "idb": "^8.0.0", "lodash-es": "^4.17.21", diff --git a/packages/common/doc-storage/src/impls/cloud/blob.ts b/packages/common/doc-storage/src/impls/cloud/blob.ts index 0abe7594b70c3..9ef0989e7e3e5 100644 --- a/packages/common/doc-storage/src/impls/cloud/blob.ts +++ b/packages/common/doc-storage/src/impls/cloud/blob.ts @@ -1,140 +1,94 @@ import { - type Blob, + deleteBlobMutation, + gqlFetcherFactory, + listBlobsQuery, + releaseDeletedBlobsMutation, + setBlobMutation, +} from '@affine/graphql'; + +import { + type BlobRecord as BlobType, BlobStorage, - type DocStorageOptions, - type ListedBlob, + type BlobStorageOptions, + type ListedBlobRecord, } from '../../storage'; -import type { Socket } from './socket'; -interface CloudBlobStorageOptions extends DocStorageOptions { - socket: Socket; +// TODO(@forehalo): websocket? +interface CloudBlobStorageOptions extends BlobStorageOptions { + endpoint: string; } export class CloudBlobStorage extends BlobStorage { - get socket() { - return this.options.socket; - } + private readonly gql = gqlFetcherFactory(this.options.endpoint + '/graphql'); - override async connect(): Promise { - // the event will be polled, there is no need to wait for socket to be connected - await this.clientHandShake(); - // this.socket.on('space:broadcast-blob-update', this.onServerUpdates); + override async doConnect() { + return; } - private async clientHandShake() { - const res = await this.socket.emitWithAck('space:join', { - spaceType: this.spaceType, - spaceId: this.spaceId, - clientVersion: BUILD_CONFIG.appVersion, - }); - - if ('error' in res) { - // TODO(@forehalo): use [UserFriendlyError] - throw new Error(res.error.message); - } + override async doDisconnect() { + return; } - override async disconnect(): Promise { - this.socket.emit('space:leave', { - spaceType: this.spaceType, - spaceId: this.spaceId, - }); - // this.socket.off('space:broadcast-doc-updates', this.onServerUpdate); - } - - // onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => { - // if ( - // this.spaceType === message.spaceType && - // this.spaceId === message.spaceId - // ) { - // // how do we deal with the data? - // } - // }; - - override async getBlob(key: string): Promise { - const res = await this.socket.emitWithAck('space:get-blob', { - spaceType: this.spaceType, - spaceId: this.spaceId, - key, - }); - - if ('error' in res) { - // TODO: use [UserFriendlyError] - throw new Error(res.error.message); + override async getBlob(key: string): Promise { + const res = await fetch( + this.options.endpoint + + '/api/workspaces/' + + this.spaceId + + '/blobs/' + + key, + { cache: 'default' } + ); + + if (!res.ok) { + return null; } + const data = await res.arrayBuffer(); + return { - ...res.data, - data: base64ToUint8Array(res.data.data), + key, + data: new Uint8Array(data), + mime: res.headers.get('content-type') || '', + size: data.byteLength, + createdAt: new Date( + res.headers.get('last-modified') || Date.now() + ).getTime(), }; } - override async setBlob(blob: Blob): Promise { - this.socket.emit('space:set-blob', { - spaceType: this.spaceType, - spaceId: this.spaceId, - key: blob.key, - data: await uint8ArrayToBase64(blob.data), - mime: blob.mime, + override async setBlob(blob: BlobType): Promise { + await this.gql({ + query: setBlobMutation, + variables: { + workspaceId: this.spaceId, + blob: new File([blob.data], blob.key, { type: blob.mime }), + }, }); } override async deleteBlob(key: string, permanently: boolean): Promise { - this.socket.emit('space:delete-blob', { - spaceType: this.spaceType, - spaceId: this.spaceId, - key, - permanently, + await this.gql({ + query: deleteBlobMutation, + variables: { workspaceId: this.spaceId, key, permanently }, }); } override async releaseBlobs(): Promise { - this.socket.emit('space:release-blobs', { - spaceType: this.spaceType, - spaceId: this.spaceId, + await this.gql({ + query: releaseDeletedBlobsMutation, + variables: { workspaceId: this.spaceId }, }); } - override async listBlobs(): Promise { - const res = await this.socket.emitWithAck('space:list-blobs', { - spaceType: this.spaceType, - spaceId: this.spaceId, + override async listBlobs(): Promise { + const res = await this.gql({ + query: listBlobsQuery, + variables: { workspaceId: this.spaceId }, }); - if ('error' in res) { - // TODO: use [UserFriendlyError] - throw new Error(res.error.message); - } - - return res.data; + return res.workspace.blobs.map(blob => ({ + ...blob, + createdAt: new Date(blob.createdAt).getTime(), + })); } } - -export function uint8ArrayToBase64(array: Uint8Array): Promise { - return new Promise(resolve => { - // Create a blob from the Uint8Array - const blob = new Blob([array]); - - const reader = new FileReader(); - reader.onload = function () { - const dataUrl = reader.result as string | null; - if (!dataUrl) { - resolve(''); - return; - } - // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data - const base64 = dataUrl.split(',')[1]; - resolve(base64); - }; - - reader.readAsDataURL(blob); - }); -} - -export function base64ToUint8Array(base64: string) { - const binaryString = atob(base64); - const binaryArray = binaryString.split('').map(function (char) { - return char.charCodeAt(0); - }); - return new Uint8Array(binaryArray); -} diff --git a/packages/common/doc-storage/src/impls/cloud/doc.ts b/packages/common/doc-storage/src/impls/cloud/doc.ts index 6708090a5f2e3..d013b869813b7 100644 --- a/packages/common/doc-storage/src/impls/cloud/doc.ts +++ b/packages/common/doc-storage/src/impls/cloud/doc.ts @@ -1,27 +1,26 @@ import { DocStorage, type DocStorageOptions } from '../../storage'; -import type { ServerEventsMap, Socket } from './socket'; +import { + base64ToUint8Array, + type ServerEventsMap, + type Socket, + uint8ArrayToBase64, +} from './socket'; interface CloudDocStorageOptions extends DocStorageOptions { + endpoint: string; socket: Socket; } export class CloudDocStorage extends DocStorage { - get name() { - // @ts-expect-error we need it - return this.options.socket.io.uri; - } - - get socket() { + private get socket() { return this.options.socket; } - override async connect(): Promise { - // the event will be polled, there is no need to wait for socket to be connected - await this.clientHandShake(); - this.socket.on('space:broadcast-doc-updates', this.onServerUpdates); + get name() { + return this.options.endpoint; } - private async clientHandShake() { + override async doConnect(): Promise { const res = await this.socket.emitWithAck('space:join', { spaceType: this.spaceType, spaceId: this.spaceId, @@ -29,17 +28,17 @@ export class CloudDocStorage extends DocStorage { }); if ('error' in res) { - // TODO(@forehalo): use [UserFriendlyError] throw new Error(res.error.message); } + this.socket?.on('space:broadcast-doc-updates', this.onServerUpdates); } - override async disconnect(): Promise { + override async doDisconnect(): Promise { this.socket.emit('space:leave', { spaceType: this.spaceType, spaceId: this.spaceId, }); - this.socket.off('space:broadcast-doc-updates', this.onServerUpdates); + this.socket?.off('space:broadcast-doc-updates', this.onServerUpdates); } onServerUpdates: ServerEventsMap['space:broadcast-doc-updates'] = message => { @@ -152,32 +151,3 @@ export class CloudDocStorage extends DocStorage { return false; } } - -export function uint8ArrayToBase64(array: Uint8Array): Promise { - return new Promise(resolve => { - // Create a blob from the Uint8Array - const blob = new Blob([array]); - - const reader = new FileReader(); - reader.onload = function () { - const dataUrl = reader.result as string | null; - if (!dataUrl) { - resolve(''); - return; - } - // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data - const base64 = dataUrl.split(',')[1]; - resolve(base64); - }; - - reader.readAsDataURL(blob); - }); -} - -export function base64ToUint8Array(base64: string) { - const binaryString = atob(base64); - const binaryArray = binaryString.split('').map(function (char) { - return char.charCodeAt(0); - }); - return new Uint8Array(binaryArray); -} diff --git a/packages/common/doc-storage/src/impls/cloud/index.ts b/packages/common/doc-storage/src/impls/cloud/index.ts index f42f6dd754a91..d476ae6eb9b92 100644 --- a/packages/common/doc-storage/src/impls/cloud/index.ts +++ b/packages/common/doc-storage/src/impls/cloud/index.ts @@ -1 +1,2 @@ +export * from './blob'; export * from './doc'; diff --git a/packages/common/doc-storage/src/impls/cloud/socket.ts b/packages/common/doc-storage/src/impls/cloud/socket.ts index 7169728610317..f90dd8a1db50e 100644 --- a/packages/common/doc-storage/src/impls/cloud/socket.ts +++ b/packages/common/doc-storage/src/impls/cloud/socket.ts @@ -1,4 +1,4 @@ -import type { Socket as IO } from 'socket.io-client'; +import type { Socket as SocketIO } from 'socket.io-client'; // TODO(@forehalo): use [UserFriendlyError] interface EventError { @@ -22,13 +22,6 @@ interface ServerEvents { updates: string[]; timestamp: number; }; - 'space:broadcast-blob-update': { - spaceType: string; - spaceId: string; - key: string; - data: string; - mime: string; - }; } interface ClientEvents { @@ -37,6 +30,21 @@ interface ClientEvents { { clientId: string }, ]; 'space:leave': { spaceType: string; spaceId: string }; + 'space:join-awareness': [ + { + spaceType: string; + spaceId: string; + docId: string; + clientVersion: string; + }, + { clientId: string }, + ]; + 'space:leave-awareness': { + spaceType: string; + spaceId: string; + docId: string; + }; + 'space:push-doc-updates': [ { spaceType: string; spaceId: string; docId: string; updates: string[] }, { timestamp: number }, @@ -62,44 +70,6 @@ interface ClientEvents { timestamp: number; }, ]; - - // blobs - 'space:get-blob': [ - { - spaceType: string; - spaceId: string; - key: string; - }, - { - key: string; - data: string; - mime: string; - }, - ]; - 'space:set-blob': { - spaceType: string; - spaceId: string; - key: string; - data: string; - mime: string; - }; - 'space:delete-blob': { - spaceType: string; - spaceId: string; - key: string; - permanently: boolean; - }; - 'space:release-blobs': { - spaceType: string; - spaceId: string; - }; - 'space:list-blobs': [ - { - spaceType: string; - spaceId: string; - }, - { key: string; size: number }[], - ]; } export type ServerEventsMap = { @@ -115,4 +85,33 @@ export type ClientEventsMap = { : (data: ClientEvents[Key]) => void; }; -export type Socket = IO; +export type Socket = SocketIO; + +export function uint8ArrayToBase64(array: Uint8Array): Promise { + return new Promise(resolve => { + // Create a blob from the Uint8Array + const blob = new Blob([array]); + + const reader = new FileReader(); + reader.onload = function () { + const dataUrl = reader.result as string | null; + if (!dataUrl) { + resolve(''); + return; + } + // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data + const base64 = dataUrl.split(',')[1]; + resolve(base64); + }; + + reader.readAsDataURL(blob); + }); +} + +export function base64ToUint8Array(base64: string) { + const binaryString = atob(base64); + const binaryArray = binaryString.split('').map(function (char) { + return char.charCodeAt(0); + }); + return new Uint8Array(binaryArray); +} diff --git a/packages/common/doc-storage/src/impls/idb/blob.ts b/packages/common/doc-storage/src/impls/idb/blob.ts index 4d03dd78e82d7..2ebf085c4c357 100644 --- a/packages/common/doc-storage/src/impls/idb/blob.ts +++ b/packages/common/doc-storage/src/impls/idb/blob.ts @@ -1,71 +1,95 @@ -import { type Blob, BlobStorage, type ListedBlob } from '../../storage'; -import { type SpaceIDB, SpaceIndexedDbManager } from './db'; +import { + type BlobRecord, + BlobStorage, + type BlobStorageOptions, + type ListedBlobRecord, +} from '../../storage'; +import { type SpaceIDB } from './db'; -export class IndexedDBBlobStorage extends BlobStorage { - private db!: SpaceIDB; +export interface IndexedDBBlobStorageOptions extends BlobStorageOptions { + db: SpaceIDB; +} - override async connect(): Promise { - this.db = await SpaceIndexedDbManager.open( - `${this.spaceType}:${this.spaceId}` - ); +export class IndexedDBBlobStorage extends BlobStorage { + get db() { + return this.options.db; } - override async getBlob(key: string): Promise { - const trx = this.db.transaction('blobs', 'readonly'); - const blob = await trx.store.get(key); + protected override doConnect(): Promise { + return Promise.resolve(); + } + protected override doDisconnect(): Promise { + return Promise.resolve(); + } + + override async getBlob(key: string): Promise { + const trx = this.db.transaction(['blobs', 'blobData'], 'readonly'); + const blob = await trx.objectStore('blobs').get(key); + const data = await trx.objectStore('blobData').get(key); - if (!blob || blob.deletedAt) { + if (!blob || blob.deletedAt || !data) { return null; } - return blob; + return { + ...blob, + data: data.data, + }; } - override async setBlob(blob: Blob): Promise { - const trx = this.db.transaction('blobs', 'readwrite'); - await trx.store.put({ - ...blob, - size: blob.data.length, - createdAt: new Date(), + override async setBlob(blob: BlobRecord): Promise { + const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); + await trx.objectStore('blobs').put({ + key: blob.key, + mime: blob.mime, + size: blob.data.byteLength, + createdAt: Date.now(), deletedAt: null, }); + await trx.objectStore('blobData').put({ + key: blob.key, + data: blob.data, + }); } override async deleteBlob(key: string, permanently = false): Promise { - const trx = this.db.transaction('blobs', 'readwrite'); if (permanently) { - await trx.store.delete(key); + const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); + await trx.objectStore('blobs').delete(key); + await trx.objectStore('blobData').delete(key); } else { + const trx = this.db.transaction('blobs', 'readwrite'); const blob = await trx.store.get(key); if (blob) { await trx.store.put({ ...blob, - deletedAt: new Date(), + deletedAt: Date.now(), }); } } } override async releaseBlobs(): Promise { - const trx = this.db.transaction('blobs', 'readwrite'); + const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite'); - const it = trx.store.iterate(); + const it = trx.objectStore('blobs').iterate(); for await (const item of it) { if (item.value.deletedAt) { await item.delete(); + await trx.objectStore('blobData').delete(item.value.key); } } } - override async listBlobs(): Promise { + override async listBlobs(): Promise { const trx = this.db.transaction('blobs', 'readonly'); const it = trx.store.iterate(); - const blobs: ListedBlob[] = []; + const blobs: ListedBlobRecord[] = []; for await (const item of it) { if (!item.value.deletedAt) { - blobs.push({ key: item.value.key, size: item.value.size }); + blobs.push(item.value); } } diff --git a/packages/common/doc-storage/src/impls/idb/db.ts b/packages/common/doc-storage/src/impls/idb/db.ts index 7790ab320056b..d1361cfa1a2a2 100644 --- a/packages/common/doc-storage/src/impls/idb/db.ts +++ b/packages/common/doc-storage/src/impls/idb/db.ts @@ -4,29 +4,24 @@ import { type DocStorageSchema, latestVersion, migrate } from './schema'; export type SpaceIDB = IDBPDatabase; -export class SpaceIndexedDbManager { - private static db: SpaceIDB | null = null; - - static async open(name: string) { - if (this.db) { - return this.db; - } - +export const IDBProtocol = { + async open(name: string) { + let db: SpaceIDB | null = null; const blocking = () => { // notify user this connection is blocking other tabs to upgrade db - this.db?.close(); + db?.close(); }; const blocked = () => { // notify user there is tab opened with old version, close it first }; - this.db = await openDB(name, latestVersion, { + db = await openDB(name, latestVersion, { upgrade: migrate, blocking, blocked, }); - return this.db; - } -} + return db; + }, +}; diff --git a/packages/common/doc-storage/src/impls/idb/doc.ts b/packages/common/doc-storage/src/impls/idb/doc.ts index caeeac8093cf2..ac00789374431 100644 --- a/packages/common/doc-storage/src/impls/idb/doc.ts +++ b/packages/common/doc-storage/src/impls/idb/doc.ts @@ -1,21 +1,29 @@ -import { type DocRecord, DocStorage, type DocUpdate } from '../../storage'; -import { type SpaceIDB, SpaceIndexedDbManager } from './db'; - -export class IndexedDBDocStorage extends DocStorage { - private db!: SpaceIDB; +import { + type DocRecord, + DocStorage, + type DocStorageOptions, + type DocUpdate, +} from '../../storage'; +import { type SpaceIDB } from './db'; + +export interface IndexedDBDocStorageOptions extends DocStorageOptions { + db: SpaceIDB; +} +export class IndexedDBDocStorage extends DocStorage { get name() { return 'idb'; } - override async connect(): Promise { - this.db = await SpaceIndexedDbManager.open( - `${this.spaceType}:${this.spaceId}` - ); + get db() { + return this.options.db; } - override async disconnect(): Promise { - this.db.close(); + protected override doConnect(): Promise { + return Promise.resolve(); + } + protected override doDisconnect(): Promise { + return Promise.resolve(); } override async pushDocUpdates( diff --git a/packages/common/doc-storage/src/impls/idb/index.ts b/packages/common/doc-storage/src/impls/idb/index.ts index f42f6dd754a91..6cc1814ca653a 100644 --- a/packages/common/doc-storage/src/impls/idb/index.ts +++ b/packages/common/doc-storage/src/impls/idb/index.ts @@ -1 +1,3 @@ +export * from './blob'; +export * from './db'; export * from './doc'; diff --git a/packages/common/doc-storage/src/impls/idb/schema.ts b/packages/common/doc-storage/src/impls/idb/schema.ts index 8e2a1e7d45015..76819e582b0a6 100644 --- a/packages/common/doc-storage/src/impls/idb/schema.ts +++ b/packages/common/doc-storage/src/impls/idb/schema.ts @@ -7,24 +7,34 @@ IndexedDB > Table(...) Table(Snapshots) -|docId|blob|createdAt|updatedAt| -|-----|----|---------|---------| -| str | bin| Date | Date | +| docId | blob | createdAt | updatedAt | +|-------|------|-----------|-----------| +| str | bin | timestamp | timestamp | Table(Updates) -| id |docId|blob|createdAt| -|----|-----|----|---------| -|auto| str | bin| Date | +| id | docId | blob | createdAt | +|----|-------|------|-----------| +|auto| str | bin | timestamp | Table(Clocks) -| docId | clock | -|-------|--------| -| str | number | +| docId | clock | +|-------|-----------| +| str | timestamp | Table(Blobs) -| key | data | mime | size | createdAt | deletedAt | -|-----|------|------|------|-----------|-----------| -| str | bin | str | num | Date | Date | +| key | mime | size | createdAt | deletedAt | +|-----|------|------|-----------|-----------| +| str | str | num | timestamp | timestamp | + +Table(BlobData) +| key | data | +|-----|------| +| str | bin | + +Table(PeerClocks) +| peer | docId | clock | pushed | +|------|-------|-----------|-----------| +| str | str | timestamp | timestamp | */ export interface DocStorageSchema extends DBSchema { snapshots: { @@ -64,11 +74,29 @@ export interface DocStorageSchema extends DBSchema { key: string; value: { key: string; - data: Uint8Array; mime: string; size: number; - createdAt: Date; - deletedAt: Date | null; + createdAt: number; + deletedAt: number | null; + }; + }; + blobData: { + key: string; + value: { + key: string; + data: ArrayBuffer; + }; + }; + peerClocks: { + key: [string, string]; + value: { + peer: string; + docId: string; + clock: number; + pushedClock: number; + }; + indexes: { + peer: string; }; }; } @@ -116,10 +144,22 @@ const init: Migrate = db => { clocks.createIndex('timestamp', 'timestamp', { unique: false }); + const peerClocks = db.createObjectStore('peerClocks', { + keyPath: ['peerId', 'docId'], + autoIncrement: false, + }); + + peerClocks.createIndex('peer', 'peer', { unique: false }); + db.createObjectStore('blobs', { keyPath: 'key', autoIncrement: false, }); + + db.createObjectStore('blobData', { + keyPath: 'key', + autoIncrement: false, + }); }; // END REGION diff --git a/packages/common/doc-storage/src/impls/idb/sync.ts b/packages/common/doc-storage/src/impls/idb/sync.ts new file mode 100644 index 0000000000000..7021cbf8b83a1 --- /dev/null +++ b/packages/common/doc-storage/src/impls/idb/sync.ts @@ -0,0 +1,76 @@ +import { SyncStorage, type SyncStorageOptions } from '../../storage'; +import type { SpaceIDB } from './db'; + +export interface IndexedDBSyncStorageOptions extends SyncStorageOptions { + db: SpaceIDB; +} + +export class IndexedDBSyncStorage extends SyncStorage { + get db() { + return this.options.db; + } + + protected override doConnect(): Promise { + return Promise.resolve(); + } + + protected override doDisconnect(): Promise { + return Promise.resolve(); + } + + override async getPeerClocks(peer: string) { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const records = await trx.store.index('peer').getAll(peer); + + return records.reduce( + (clocks, { docId, clock }) => { + clocks[docId] = clock; + return clocks; + }, + {} as Record + ); + } + + override async setPeerClock(peer: string, docId: string, clock: number) { + const trx = this.db.transaction('peerClocks', 'readwrite'); + const record = await trx.store.get([peer, docId]); + + await trx.store.put({ + peer, + docId, + clock: Math.max(record?.clock ?? 0, clock), + pushedClock: record?.pushedClock ?? 0, + }); + } + + override async getPeerPushedClocks(peer: string) { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const records = await trx.store.index('peer').getAll(peer); + + return records.reduce( + (clocks, { docId, pushedClock }) => { + clocks[docId] = pushedClock; + return clocks; + }, + {} as Record + ); + } + + override async setPeerPushedClock( + peer: string, + docId: string, + clock: number + ) { + const trx = this.db.transaction('peerClocks', 'readwrite'); + const record = await trx.store.get([peer, docId]); + + await trx.store.put({ + peer, + docId, + clock: record?.clock ?? 0, + pushedClock: Math.max(record?.pushedClock ?? 0, clock), + }); + } +} diff --git a/packages/common/doc-storage/src/impls/sqlite/blob.ts b/packages/common/doc-storage/src/impls/sqlite/blob.ts index a625a7f89f291..a57962356529a 100644 --- a/packages/common/doc-storage/src/impls/sqlite/blob.ts +++ b/packages/common/doc-storage/src/impls/sqlite/blob.ts @@ -1,10 +1,10 @@ import type { DocStorage as NativeDocStorage } from '@affine/native'; import { - type Blob, + type BlobRecord, BlobStorage, type BlobStorageOptions, - type ListedBlob, + type ListedBlobRecord, } from '../../storage'; interface SqliteBlobStorageOptions extends BlobStorageOptions { @@ -16,10 +16,27 @@ export class SqliteBlobStorage extends BlobStorage { return this.options.db; } - override getBlob(key: string): Promise { - return this.db.getBlob(key); + protected override doConnect(): Promise { + return Promise.resolve(); } - override setBlob(blob: Blob): Promise { + protected override doDisconnect(): Promise { + return Promise.resolve(); + } + + override async getBlob(key: string): Promise { + const blob = await this.db.getBlob(key); + + if (!blob) { + return null; + } + + return { + ...blob, + data: blob.data.buffer, + createdAt: blob.createdAt.getTime(), + }; + } + override setBlob(blob: BlobRecord): Promise { return this.db.setBlob({ ...blob, data: Buffer.from(blob.data), @@ -31,7 +48,13 @@ export class SqliteBlobStorage extends BlobStorage { override releaseBlobs(): Promise { return this.db.releaseBlobs(); } - override listBlobs(): Promise { - return this.db.listBlobs(); + + override async listBlobs(): Promise { + const blobs = await this.db.listBlobs(); + + return blobs.map(blob => ({ + ...blob, + createdAt: blob.createdAt.getTime(), + })); } } diff --git a/packages/common/doc-storage/src/impls/sqlite/db.ts b/packages/common/doc-storage/src/impls/sqlite/db.ts new file mode 100644 index 0000000000000..1a92db4a719ff --- /dev/null +++ b/packages/common/doc-storage/src/impls/sqlite/db.ts @@ -0,0 +1,3 @@ +import { DocStorage as NativeDocStorage } from '@affine/native'; + +export { NativeDocStorage }; diff --git a/packages/common/doc-storage/src/impls/sqlite/doc.ts b/packages/common/doc-storage/src/impls/sqlite/doc.ts index 570f137325318..0b4bda9e60970 100644 --- a/packages/common/doc-storage/src/impls/sqlite/doc.ts +++ b/packages/common/doc-storage/src/impls/sqlite/doc.ts @@ -20,8 +20,12 @@ export class SqliteDocStorage extends DocStorage { return this.options.db; } - constructor(options: SqliteDocStorageOptions) { - super(options); + protected override doConnect(): Promise { + return Promise.resolve(); + } + + protected override doDisconnect(): Promise { + return Promise.resolve(); } override pushDocUpdates( diff --git a/packages/common/doc-storage/src/impls/sqlite/index.ts b/packages/common/doc-storage/src/impls/sqlite/index.ts index f42f6dd754a91..6cc1814ca653a 100644 --- a/packages/common/doc-storage/src/impls/sqlite/index.ts +++ b/packages/common/doc-storage/src/impls/sqlite/index.ts @@ -1 +1,3 @@ +export * from './blob'; +export * from './db'; export * from './doc'; diff --git a/packages/common/doc-storage/src/impls/sqlite/sync.ts b/packages/common/doc-storage/src/impls/sqlite/sync.ts new file mode 100644 index 0000000000000..692211696f19c --- /dev/null +++ b/packages/common/doc-storage/src/impls/sqlite/sync.ts @@ -0,0 +1,54 @@ +import { SyncStorage, type SyncStorageOptions } from '../../storage'; +import type { NativeDocStorage } from './db'; + +export interface SqliteSyncStorageOptions extends SyncStorageOptions { + db: NativeDocStorage; +} + +export class SqliteDBSyncStorage extends SyncStorage { + get db() { + return this.options.db; + } + + protected override doConnect(): Promise { + return Promise.resolve(); + } + + protected override doDisconnect(): Promise { + return Promise.resolve(); + } + + override async getPeerClocks(peer: string) { + const records = await this.db.getPeerClocks(peer); + return records.reduce( + (clocks, { docId, timestamp }) => { + clocks[docId] = timestamp.getTime(); + return clocks; + }, + {} as Record + ); + } + + override async setPeerClock(peer: string, docId: string, clock: number) { + await this.db.setPeerClock(peer, docId, new Date(clock)); + } + + override async getPeerPushedClocks(peer: string) { + const records = await this.db.getPeerPushedClocks(peer); + return records.reduce( + (clocks, { docId, timestamp }) => { + clocks[docId] = timestamp.getTime(); + return clocks; + }, + {} as Record + ); + } + + override async setPeerPushedClock( + peer: string, + docId: string, + clock: number + ) { + await this.db.setPeerPushedClock(peer, docId, new Date(clock)); + } +} diff --git a/packages/common/doc-storage/src/index.ts b/packages/common/doc-storage/src/index.ts index 85674ee7cdb78..9f9fbdc024d27 100644 --- a/packages/common/doc-storage/src/index.ts +++ b/packages/common/doc-storage/src/index.ts @@ -1 +1,63 @@ -export * from './storage'; +import type { + BlobStorage, + DocStorage, + Storage, + StorageOptions, + SyncStorage, +} from './storage'; +import { Connection } from './storage'; + +interface StorageTypes { + doc: DocStorage; + blob: BlobStorage; + sync: SyncStorage; +} + +export abstract class StorageManager extends Connection { + protected storages: Record = {}; + + constructor(public readonly options: StorageOptions) { + super(); + } + + override async connect() { + if (!this.connected) { + await this.doConnect(); + await Promise.all( + Object.values(this.storages).map(storage => storage.connect()) + ); + this._connected = true; + } + } + + override async disconnect() { + await Promise.all( + Object.values(this.storages).map(storage => storage.disconnect()) + ); + await this.doDisconnect(); + this._connected = false; + } + + get(type: Type): StorageTypes[Type] { + const storage = this.storages[type]; + if (!storage) { + throw new Error( + `Unregistered storage type requested.\nWant: ${type}\n.Registered: ${Object.keys(this.storages).join(', ')}.` + ); + } + + // @ts-expect-error we have typecheck on adding + return storage; + } + + add( + type: Type, + storage: StorageTypes[Type] + ) { + this.storages[type] = storage; + } + + remove(type: Type) { + delete this.storages[type]; + } +} diff --git a/packages/common/doc-storage/src/storage/blob.ts b/packages/common/doc-storage/src/storage/blob.ts index a5d6559b4b859..28ef3dedabcdb 100644 --- a/packages/common/doc-storage/src/storage/blob.ts +++ b/packages/common/doc-storage/src/storage/blob.ts @@ -1,42 +1,28 @@ -import { Connection } from './connection'; +import { Storage, type StorageOptions } from './storage'; -export interface BlobStorageOptions { - spaceType: string; - spaceId: string; -} +export interface BlobStorageOptions extends StorageOptions {} -export interface Blob { +export interface BlobRecord { key: string; - data: Uint8Array; + data: ArrayBuffer; mime: string; + size: number; + createdAt: number; } -export interface ListedBlob { +export interface ListedBlobRecord { key: string; + mime: string; size: number; + createdAt: number; } export abstract class BlobStorage< Options extends BlobStorageOptions = BlobStorageOptions, -> extends Connection { - public readonly options: Options; - - constructor(opts: Options) { - super(); - this.options = opts; - } - - get spaceType() { - return this.options.spaceType; - } - - get spaceId() { - return this.options.spaceId; - } - - abstract getBlob(key: string): Promise; - abstract setBlob(blob: Blob): Promise; +> extends Storage { + abstract getBlob(key: string): Promise; + abstract setBlob(blob: BlobRecord): Promise; abstract deleteBlob(key: string, permanently: boolean): Promise; abstract releaseBlobs(): Promise; - abstract listBlobs(/* pagination? */): Promise; + abstract listBlobs(/* pagination? */): Promise; } diff --git a/packages/common/doc-storage/src/storage/connection.ts b/packages/common/doc-storage/src/storage/connection.ts index f82a72fbd3931..b4901cb259551 100644 --- a/packages/common/doc-storage/src/storage/connection.ts +++ b/packages/common/doc-storage/src/storage/connection.ts @@ -1,11 +1,26 @@ -export class Connection { - protected connected: boolean = false; - connect(): Promise { - this.connected = true; - return Promise.resolve(); +export abstract class Connection { + protected _connected = false; + + protected abstract doConnect(): Promise; + protected abstract doDisconnect(): Promise; + + get connected() { + return this._connected; } - disconnect(): Promise { - this.connected = false; - return Promise.resolve(); + + async connect() { + if (!this._connected) { + await this.doConnect(); + this._connected = true; + } + } + + async disconnect() { + if (!this._connected) { + return; + } + + await this.doDisconnect(); + this._connected = false; } } diff --git a/packages/common/doc-storage/src/storage/doc/doc.ts b/packages/common/doc-storage/src/storage/doc.ts similarity index 92% rename from packages/common/doc-storage/src/storage/doc/doc.ts rename to packages/common/doc-storage/src/storage/doc.ts index b6e22a92b7287..dff8ea129bb2c 100644 --- a/packages/common/doc-storage/src/storage/doc/doc.ts +++ b/packages/common/doc-storage/src/storage/doc.ts @@ -9,47 +9,53 @@ import { UndoManager, } from 'yjs'; -import { Connection } from '../connection'; -import { SingletonLocker } from '../lock'; -import type { - DocDiff, - DocRecord, - DocUpdate, - Editor, - HistoryFilter, -} from './types'; - -export type SpaceType = 'workspace' | 'userspace'; -export interface DocStorageOptions { - spaceType: string; +import { SingletonLocker } from './lock'; +import { Storage, type StorageOptions } from './storage'; + +export interface DocRecord { spaceId: string; + docId: string; + bin: Uint8Array; + timestamp: number; + editor?: string; +} + +export interface DocDiff { + missing: Uint8Array; + state: Uint8Array; + timestamp: number; +} + +export interface DocUpdate { + bin: Uint8Array; + timestamp: number; + editor?: string; +} + +export interface HistoryFilter { + before?: number; + limit?: number; +} + +export interface Editor { + name: string; + avatarUrl: string | null; +} + +export interface DocStorageOptions extends StorageOptions { mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; } export abstract class DocStorage< Opts extends DocStorageOptions = DocStorageOptions, -> extends Connection { +> extends Storage { abstract get name(): string; - public readonly options: Opts; private readonly locker = new SingletonLocker(); protected readonly updatesListeners = new Set< (docId: string, updates: Uint8Array[], timestamp: number) => void >(); - get spaceType() { - return this.options.spaceType; - } - - get spaceId() { - return this.options.spaceId; - } - - constructor(options: Opts) { - super(); - this.options = options; - } - // REGION: open apis /** * Tell a binary is empty yjs binary or not. diff --git a/packages/common/doc-storage/src/storage/doc/index.ts b/packages/common/doc-storage/src/storage/doc/index.ts deleted file mode 100644 index 1fa447519627c..0000000000000 --- a/packages/common/doc-storage/src/storage/doc/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './doc'; -export * from './types'; diff --git a/packages/common/doc-storage/src/storage/doc/types.ts b/packages/common/doc-storage/src/storage/doc/types.ts deleted file mode 100644 index 366eb280e4a25..0000000000000 --- a/packages/common/doc-storage/src/storage/doc/types.ts +++ /dev/null @@ -1,29 +0,0 @@ -export interface DocRecord { - spaceId: string; - docId: string; - bin: Uint8Array; - timestamp: number; - editor?: string; -} - -export interface DocDiff { - missing: Uint8Array; - state: Uint8Array; - timestamp: number; -} - -export interface DocUpdate { - bin: Uint8Array; - timestamp: number; - editor?: string; -} - -export interface HistoryFilter { - before?: number; - limit?: number; -} - -export interface Editor { - name: string; - avatarUrl: string | null; -} diff --git a/packages/common/doc-storage/src/storage/index.ts b/packages/common/doc-storage/src/storage/index.ts index d476ae6eb9b92..a1e68d5b2c1db 100644 --- a/packages/common/doc-storage/src/storage/index.ts +++ b/packages/common/doc-storage/src/storage/index.ts @@ -1,2 +1,5 @@ export * from './blob'; +export * from './connection'; export * from './doc'; +export * from './storage'; +export * from './sync'; diff --git a/packages/common/doc-storage/src/storage/storage.ts b/packages/common/doc-storage/src/storage/storage.ts new file mode 100644 index 0000000000000..b2b9c20bfedfe --- /dev/null +++ b/packages/common/doc-storage/src/storage/storage.ts @@ -0,0 +1,24 @@ +import { Connection } from './connection'; + +export type SpaceType = 'workspace' | 'userspace'; + +export interface StorageOptions { + type: SpaceType; + id: string; +} + +export abstract class Storage< + Opts extends StorageOptions = StorageOptions, +> extends Connection { + get spaceType() { + return this.options.type; + } + + get spaceId() { + return this.options.id; + } + + constructor(public readonly options: Opts) { + super(); + } +} diff --git a/packages/common/doc-storage/src/storage/sync.ts b/packages/common/doc-storage/src/storage/sync.ts new file mode 100644 index 0000000000000..a707e6215d7e5 --- /dev/null +++ b/packages/common/doc-storage/src/storage/sync.ts @@ -0,0 +1,26 @@ +import { Storage, type StorageOptions } from './storage'; + +export interface SyncStorageOptions extends StorageOptions {} + +export interface PeerClock { + docId: string; + clock: number; +} + +export abstract class SyncStorage< + Opts extends SyncStorageOptions = SyncStorageOptions, +> extends Storage { + abstract getPeerClocks(peer: string): Promise>; + abstract setPeerClock( + peer: string, + docId: string, + clock: number + ): Promise; + + abstract getPeerPushedClocks(peer: string): Promise>; + abstract setPeerPushedClock( + peer: string, + docId: string, + clock: number + ): Promise; +} diff --git a/packages/common/doc-storage/tsconfig.json b/packages/common/doc-storage/tsconfig.json index 4bbd8d0b79dd9..87b2eaa0c96b2 100644 --- a/packages/common/doc-storage/tsconfig.json +++ b/packages/common/doc-storage/tsconfig.json @@ -5,5 +5,13 @@ "composite": true, "noEmit": false, "outDir": "lib" - } + }, + "references": [ + { + "path": "../../frontend/graphql" + }, + { + "path": "../../frontend/native" + } + ] } diff --git a/packages/frontend/core/src/modules/cloud/stores/user-quota.ts b/packages/frontend/core/src/modules/cloud/stores/user-quota.ts index 0413d46dcb8b5..70698060aec26 100644 --- a/packages/frontend/core/src/modules/cloud/stores/user-quota.ts +++ b/packages/frontend/core/src/modules/cloud/stores/user-quota.ts @@ -23,7 +23,7 @@ export class UserQuotaStore extends Store { return { userId: data.currentUser.id, quota: data.currentUser.quota, - used: data.collectAllBlobSizes.size, + used: data.currentUser.quotaUsage.storageQuota, }; } } diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts b/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts index 3a41218bb2b53..429a431bfe664 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts @@ -58,7 +58,7 @@ export class CloudBlobStorage implements BlobStorage { query: deleteBlobMutation, variables: { workspaceId: key, - hash: key, + key, }, }); } @@ -70,6 +70,6 @@ export class CloudBlobStorage implements BlobStorage { workspaceId: this.workspaceId, }, }); - return result.listBlobs; + return result.workspace.blobs.map(blob => blob.key); } } diff --git a/packages/frontend/graphql/src/graphql/blob-delete.gql b/packages/frontend/graphql/src/graphql/blob-delete.gql index 790c326465909..ab78a54d88c6a 100644 --- a/packages/frontend/graphql/src/graphql/blob-delete.gql +++ b/packages/frontend/graphql/src/graphql/blob-delete.gql @@ -1,3 +1,7 @@ -mutation deleteBlob($workspaceId: String!, $hash: String!) { - deleteBlob(workspaceId: $workspaceId, hash: $hash) +mutation deleteBlob( + $workspaceId: String! + $key: String! + $permanently: Boolean +) { + deleteBlob(workspaceId: $workspaceId, key: $key, permanently: $permanently) } diff --git a/packages/frontend/graphql/src/graphql/blob-list.gql b/packages/frontend/graphql/src/graphql/blob-list.gql index 67a19c718c1bb..5b314f0cc5cfa 100644 --- a/packages/frontend/graphql/src/graphql/blob-list.gql +++ b/packages/frontend/graphql/src/graphql/blob-list.gql @@ -1,3 +1,10 @@ query listBlobs($workspaceId: String!) { - listBlobs(workspaceId: $workspaceId) + workspace(id: $workspaceId) { + blobs { + key + size + mime + createdAt + } + } } diff --git a/packages/frontend/graphql/src/graphql/blob-release-deleted.gql b/packages/frontend/graphql/src/graphql/blob-release-deleted.gql new file mode 100644 index 0000000000000..d51d0ef7ca68d --- /dev/null +++ b/packages/frontend/graphql/src/graphql/blob-release-deleted.gql @@ -0,0 +1,3 @@ +mutation releaseDeletedBlobs($workspaceId: String!) { + releaseDeletedBlobs(workspaceId: $workspaceId) +} diff --git a/packages/frontend/graphql/src/graphql/index.ts b/packages/frontend/graphql/src/graphql/index.ts index 2bc168d6cf542..e94ec76fdf98e 100644 --- a/packages/frontend/graphql/src/graphql/index.ts +++ b/packages/frontend/graphql/src/graphql/index.ts @@ -47,19 +47,37 @@ export const deleteBlobMutation = { definitionName: 'deleteBlob', containsFile: false, query: ` -mutation deleteBlob($workspaceId: String!, $hash: String!) { - deleteBlob(workspaceId: $workspaceId, hash: $hash) +mutation deleteBlob($workspaceId: String!, $key: String!, $permanently: Boolean) { + deleteBlob(workspaceId: $workspaceId, key: $key, permanently: $permanently) }`, }; export const listBlobsQuery = { id: 'listBlobsQuery' as const, operationName: 'listBlobs', - definitionName: 'listBlobs', + definitionName: 'workspace', containsFile: false, query: ` query listBlobs($workspaceId: String!) { - listBlobs(workspaceId: $workspaceId) + workspace(id: $workspaceId) { + blobs { + key + size + mime + createdAt + } + } +}`, +}; + +export const releaseDeletedBlobsMutation = { + id: 'releaseDeletedBlobsMutation' as const, + operationName: 'releaseDeletedBlobs', + definitionName: 'releaseDeletedBlobs', + containsFile: false, + query: ` +mutation releaseDeletedBlobs($workspaceId: String!) { + releaseDeletedBlobs(workspaceId: $workspaceId) }`, }; @@ -836,7 +854,7 @@ mutation publishPage($workspaceId: String!, $pageId: String!, $mode: PublicPageM export const quotaQuery = { id: 'quotaQuery' as const, operationName: 'quota', - definitionName: 'currentUser,collectAllBlobSizes', + definitionName: 'currentUser', containsFile: false, query: ` query quota { @@ -856,9 +874,9 @@ query quota { memberLimit } } - } - collectAllBlobSizes { - size + quotaUsage { + storageQuota + } } }`, }; diff --git a/packages/frontend/graphql/src/graphql/quota.gql b/packages/frontend/graphql/src/graphql/quota.gql index c0268b6443584..682c2e5bc3550 100644 --- a/packages/frontend/graphql/src/graphql/quota.gql +++ b/packages/frontend/graphql/src/graphql/quota.gql @@ -15,8 +15,8 @@ query quota { memberLimit } } - } - collectAllBlobSizes { - size + quotaUsage { + storageQuota + } } } diff --git a/packages/frontend/graphql/src/schema.ts b/packages/frontend/graphql/src/schema.ts index eaea9072bccd9..b078cad0e181d 100644 --- a/packages/frontend/graphql/src/schema.ts +++ b/packages/frontend/graphql/src/schema.ts @@ -469,6 +469,14 @@ export interface ListUserInput { skip: InputMaybe; } +export interface ListedBlob { + __typename?: 'ListedBlob'; + createdAt: Scalars['String']['output']; + key: Scalars['String']['output']; + mime: Scalars['String']['output']; + size: Scalars['Int']['output']; +} + export interface ManageUserInput { /** User email */ email: InputMaybe; @@ -517,6 +525,7 @@ export interface Mutation { leaveWorkspace: Scalars['Boolean']['output']; publishPage: WorkspacePage; recoverDoc: Scalars['DateTime']['output']; + releaseDeletedBlobs: Scalars['Boolean']['output']; /** Remove user avatar */ removeAvatar: RemoveAvatar; removeWorkspaceFeature: Scalars['Int']['output']; @@ -614,7 +623,9 @@ export interface MutationCreateWorkspaceArgs { } export interface MutationDeleteBlobArgs { - hash: Scalars['String']['input']; + hash: InputMaybe; + key: InputMaybe; + permanently?: Scalars['Boolean']['input']; workspaceId: Scalars['String']['input']; } @@ -655,6 +666,10 @@ export interface MutationRecoverDocArgs { workspaceId: Scalars['String']['input']; } +export interface MutationReleaseDeletedBlobsArgs { + workspaceId: Scalars['String']['input']; +} + export interface MutationRemoveWorkspaceFeatureArgs { feature: FeatureType; workspaceId: Scalars['String']['input']; @@ -800,9 +815,7 @@ export enum PublicPageMode { export interface Query { __typename?: 'Query'; - /** @deprecated no more needed */ - checkBlobSize: WorkspaceBlobSizes; - /** @deprecated use `user.storageUsage` instead */ + /** @deprecated use `user.quotaUsage` instead */ collectAllBlobSizes: WorkspaceBlobSizes; /** Get current user */ currentUser: Maybe; @@ -841,11 +854,6 @@ export interface Query { workspaces: Array; } -export interface QueryCheckBlobSizeArgs { - size: Scalars['SafeInt']['input']; - workspaceId: Scalars['String']['input']; -} - export interface QueryErrorArgs { name: ErrorNames; } @@ -1123,6 +1131,11 @@ export interface UserQuotaHumanReadable { storageQuota: Scalars['String']['output']; } +export interface UserQuotaUsage { + __typename?: 'UserQuotaUsage'; + storageQuota: Scalars['SafeInt']['output']; +} + export interface UserSubscription { __typename?: 'UserSubscription'; canceledAt: Maybe; @@ -1169,6 +1182,7 @@ export interface UserType { /** User name */ name: Scalars['String']['output']; quota: Maybe; + quotaUsage: UserQuotaUsage; /** @deprecated use `UserType.subscriptions` */ subscription: Maybe; subscriptions: Array; @@ -1221,7 +1235,7 @@ export interface WorkspaceType { /** Available features of workspace */ availableFeatures: Array; /** List blobs of workspace */ - blobs: Array; + blobs: Array; /** Blobs size of workspace */ blobsSize: Scalars['Int']['output']; /** Workspace created date */ @@ -1311,7 +1325,8 @@ export type AdminServerConfigQuery = { export type DeleteBlobMutationVariables = Exact<{ workspaceId: Scalars['String']['input']; - hash: Scalars['String']['input']; + key: Scalars['String']['input']; + permanently: InputMaybe; }>; export type DeleteBlobMutation = { @@ -1323,7 +1338,28 @@ export type ListBlobsQueryVariables = Exact<{ workspaceId: Scalars['String']['input']; }>; -export type ListBlobsQuery = { __typename?: 'Query'; listBlobs: Array }; +export type ListBlobsQuery = { + __typename?: 'Query'; + workspace: { + __typename?: 'WorkspaceType'; + blobs: Array<{ + __typename?: 'ListedBlob'; + key: string; + size: number; + mime: string; + createdAt: string; + }>; + }; +}; + +export type ReleaseDeletedBlobsMutationVariables = Exact<{ + workspaceId: Scalars['String']['input']; +}>; + +export type ReleaseDeletedBlobsMutation = { + __typename?: 'Mutation'; + releaseDeletedBlobs: boolean; +}; export type SetBlobMutationVariables = Exact<{ workspaceId: Scalars['String']['input']; @@ -2051,8 +2087,8 @@ export type QuotaQuery = { memberLimit: string; }; } | null; + quotaUsage: { __typename?: 'UserQuotaUsage'; storageQuota: number }; } | null; - collectAllBlobSizes: { __typename?: 'WorkspaceBlobSizes'; size: number }; }; export type RecoverDocMutationVariables = Exact<{ @@ -2678,6 +2714,11 @@ export type Mutations = variables: DeleteBlobMutationVariables; response: DeleteBlobMutation; } + | { + name: 'releaseDeletedBlobsMutation'; + variables: ReleaseDeletedBlobsMutationVariables; + response: ReleaseDeletedBlobsMutation; + } | { name: 'setBlobMutation'; variables: SetBlobMutationVariables; diff --git a/packages/frontend/native/index.d.ts b/packages/frontend/native/index.d.ts index bff1c745015f6..cfeb7c3990fd5 100644 --- a/packages/frontend/native/index.d.ts +++ b/packages/frontend/native/index.d.ts @@ -2,7 +2,12 @@ /* eslint-disable */ export declare class DocStorage { constructor(path: string) - connect(): Promise + /** + * Initialize the database and run migrations. + * If `migrations` folder is provided, it should be a path to a directory containing SQL migration files. + * If not, it will to try read migrations under './migrations' related to where this program is running(PWD). + */ + init(migrationsFolder?: string | undefined | null): Promise close(): Promise get isClosed(): Promise /** @@ -18,10 +23,14 @@ export declare class DocStorage { deleteDoc(docId: string): Promise getDocClocks(after?: number | undefined | null): Promise> getBlob(key: string): Promise - setBlob(blob: Blob): Promise + setBlob(blob: SetBlob): Promise deleteBlob(key: string, permanently: boolean): Promise releaseBlobs(): Promise listBlobs(): Promise> + getPeerClocks(peer: string): Promise> + setPeerClock(peer: string, docId: string, clock: Date): Promise + getPeerPushedClocks(peer: string): Promise> + setPeerPushedClock(peer: string, docId: string, clock: Date): Promise } export declare class SqliteConnection { @@ -65,6 +74,8 @@ export interface Blob { key: string data: Buffer mime: string + size: number + createdAt: Date } export interface BlobRow { @@ -98,10 +109,18 @@ export interface InsertRow { export interface ListedBlob { key: string size: number + mime: string + createdAt: Date } export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise +export interface SetBlob { + key: string + data: Buffer + mime: string +} + export interface UpdateRow { id: number timestamp: Date diff --git a/packages/frontend/native/migrations/20240929082254_init.sql b/packages/frontend/native/migrations/20240929082254_init.sql index 619cb49b5044e..657814aa23254 100644 --- a/packages/frontend/native/migrations/20240929082254_init.sql +++ b/packages/frontend/native/migrations/20240929082254_init.sql @@ -1,21 +1,25 @@ +CREATE TABLE "v2_meta" ( + space_id VARCHAR PRIMARY KEY NOT NULL +); + CREATE TABLE "v2_snapshots" ( - doc_id TEXT PRIMARY KEY NOT NULL, + doc_id VARCHAR PRIMARY KEY NOT NULL, data BLOB NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL ); -CREATE INDEX snapshots_doc_id ON v2_snapshots(doc_id); CREATE TABLE "v2_updates" ( id INTEGER PRIMARY KEY AUTOINCREMENT, - doc_id TEXT NOT NULL, + doc_id VARCHAR NOT NULL, data BLOB NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ); -CREATE INDEX updates_doc_id ON v2_updates (doc_id); +CREATE INDEX v2_updates_doc_id ON v2_updates (doc_id); +CREATE UNIQUE INDEX v2_updates_doc_id_created_at ON v2_updates (doc_id, created_at); CREATE TABLE "v2_clocks" ( - doc_id TEXT PRIMARY KEY NOT NULL, + doc_id VARCHAR PRIMARY KEY NOT NULL, timestamp TIMESTAMP NOT NULL ); @@ -26,4 +30,12 @@ CREATE TABLE "v2_blobs" ( size INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, deleted_at TIMESTAMP +); + +CREATE TABLE "v2_peer_clocks" ( + peer VARCHAR NOT NULL, + doc_id VARCHAR NOT NULL, + clock TIMESTAMP NOT NULL, + pushed_clock TIMESTAMP NOT NULL, + PRIMARY KEY (peer, doc_id) ); \ No newline at end of file diff --git a/packages/frontend/native/src/sqlite/doc_storage/blob.rs b/packages/frontend/native/src/sqlite/doc_storage/blob.rs new file mode 100644 index 0000000000000..5e7ab07b3e380 --- /dev/null +++ b/packages/frontend/native/src/sqlite/doc_storage/blob.rs @@ -0,0 +1,199 @@ +use super::{storage::SqliteDocStorage, Blob, ListedBlob, SetBlob}; + +type Result = std::result::Result; + +impl SqliteDocStorage { + pub async fn get_blob(&self, key: String) -> Result> { + sqlx::query_as!( + Blob, + "SELECT key, data, size, mime, created_at FROM v2_blobs WHERE key = ? AND deleted_at IS NULL", + key + ) + .fetch_optional(&self.pool) + .await + } + + pub async fn set_blob(&self, blob: SetBlob) -> Result<()> { + sqlx::query( + r#" + INSERT INTO v2_blobs (key, data, mime, size) + VALUES ($1, $2, $3, $4) + ON CONFLICT(key) + DO UPDATE SET data=$2, mime=$3, size=$4, deleted_at=NULL;"#, + ) + .bind(blob.key) + .bind(blob.data.as_ref()) + .bind(blob.mime) + .bind(blob.data.len() as i64) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn delete_blob(&self, key: String, permanently: bool) -> Result<()> { + if permanently { + sqlx::query("DELETE FROM v2_blobs WHERE key = ?") + .bind(&key) + .execute(&self.pool) + .await?; + } else { + sqlx::query("UPDATE v2_blobs SET deleted_at = CURRENT_TIMESTAMP WHERE key = ?") + .bind(&key) + .execute(&self.pool) + .await?; + } + + Ok(()) + } + + pub async fn release_blobs(&self) -> Result<()> { + sqlx::query("DELETE FROM v2_blobs WHERE deleted_at IS NOT NULL;") + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn list_blobs(&self) -> Result> { + sqlx::query_as!( + ListedBlob, + "SELECT key, size, mime, created_at FROM v2_blobs WHERE deleted_at IS NULL ORDER BY created_at DESC;" + ) + .fetch_all(&self.pool) + .await + } +} + +#[cfg(test)] +mod tests { + use napi::bindgen_prelude::Buffer; + use sqlx::Row; + + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + storage.test_migrate().await.unwrap(); + + storage + } + + #[tokio::test] + async fn delete_blob() { + let storage = get_storage().await; + + for i in 1..5u32 { + storage + .set_blob(SetBlob { + key: format!("test_{}", i), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + let result = storage.get_blob("test_1".to_string()).await.unwrap(); + + assert!(result.is_some()); + + storage + .delete_blob("test_".to_string(), false) + .await + .unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + assert!(result.is_none()); + + storage + .delete_blob("test_2".to_string(), true) + .await + .unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn list_blobs() { + let storage = get_storage().await; + + let blobs = storage.list_blobs().await.unwrap(); + + assert_eq!(blobs.len(), 0); + + for i in 1..5u32 { + storage + .set_blob(SetBlob { + key: format!("test_{}", i), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + let blobs = storage.list_blobs().await.unwrap(); + + assert_eq!(blobs.len(), 4); + assert_eq!( + blobs.iter().map(|b| b.key.as_str()).collect::>(), + vec!["test_1", "test_2", "test_3", "test_4"] + ); + + storage + .delete_blob("test_2".to_string(), false) + .await + .unwrap(); + + storage + .delete_blob("test_3".to_string(), true) + .await + .unwrap(); + + let query = sqlx::query("SELECT COUNT(*) as len FROM v2_blobs;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(query.get::("len"), 3); + + let blobs = storage.list_blobs().await.unwrap(); + assert_eq!(blobs.len(), 2); + assert_eq!( + blobs.iter().map(|b| b.key.as_str()).collect::>(), + vec!["test_1", "test_4"] + ); + } + + #[tokio::test] + async fn release_blobs() { + let storage = get_storage().await; + + for i in 1..5u32 { + storage + .set_blob(SetBlob { + key: format!("test_{}", i), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + storage + .delete_blob("test_2".to_string(), false) + .await + .unwrap(); + storage.release_blobs().await.unwrap(); + + let query = sqlx::query("SELECT COUNT(*) as len FROM v2_blobs;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(query.get::("len"), 3); + } +} diff --git a/packages/frontend/native/src/sqlite/doc_storage/doc.rs b/packages/frontend/native/src/sqlite/doc_storage/doc.rs new file mode 100644 index 0000000000000..98de5637d89a3 --- /dev/null +++ b/packages/frontend/native/src/sqlite/doc_storage/doc.rs @@ -0,0 +1,339 @@ +use chrono::{DateTime, NaiveDateTime}; +use sqlx::{QueryBuilder, Row}; + +use super::storage::{Result, SqliteDocStorage}; +use super::{DocClock, DocRecord, DocUpdate}; + +impl SqliteDocStorage { + pub async fn push_updates, Updates: AsRef<[Update]>>( + &self, + doc_id: String, + updates: Updates, + ) -> Result { + let mut cnt = 0; + + for chunk in updates.as_ref().chunks(10) { + self.batch_push_updates(&doc_id, chunk).await?; + cnt += chunk.len() as u32; + } + + Ok(cnt) + } + + pub async fn get_doc_snapshot(&self, doc_id: String) -> Result> { + sqlx::query_as!( + DocRecord, + "SELECT doc_id, data, updated_at as timestamp FROM v2_snapshots WHERE doc_id = ?", + doc_id + ) + .fetch_optional(&self.pool) + .await + } + + pub async fn set_doc_snapshot(&self, snapshot: DocRecord) -> Result { + let result = sqlx::query( + r#" + INSERT INTO v2_snapshots (doc_id, data, updated_at) + VALUES ($1, $2, $3) + ON CONFLICT(doc_id) + DO UPDATE SET data=$2, updated_at=$3 + WHERE updated_at <= $3;"#, + ) + .bind(snapshot.doc_id) + .bind(snapshot.data.as_ref()) + .bind(snapshot.timestamp) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected() == 1) + } + + pub async fn get_doc_updates(&self, doc_id: String) -> Result> { + sqlx::query_as!( + DocUpdate, + "SELECT doc_id, created_at, data FROM v2_updates WHERE doc_id = ?", + doc_id + ) + .fetch_all(&self.pool) + .await + } + + pub async fn mark_updates_merged( + &self, + doc_id: String, + updates: Vec, + ) -> Result { + let mut qb = QueryBuilder::new("DELETE FROM v2_updates"); + + qb.push(" WHERE doc_id = "); + qb.push_bind(doc_id); + qb.push(" AND created_at IN ("); + let mut separated = qb.separated(", "); + updates.iter().for_each(|update| { + separated.push_bind(update); + }); + qb.push(");"); + + let query = qb.build(); + + let result = query.execute(&self.pool).await?; + + Ok(result.rows_affected() as u32) + } + + async fn batch_push_updates>( + &self, + doc_id: &str, + updates: &[Update], + ) -> Result<()> { + let mut timestamp = chrono::Utc::now().timestamp_micros(); + + let mut qb = QueryBuilder::new("INSERT INTO v2_updates (doc_id, data, created_at) "); + qb.push_values(updates, |mut b, update| { + timestamp += 1; + b.push_bind(doc_id).push_bind(update.as_ref()).push_bind( + DateTime::from_timestamp_millis(timestamp) + .unwrap() + .naive_utc(), + ); + }); + + let query = qb.build(); + + let mut tx = self.pool.begin().await?; + query.execute(&mut *tx).await?; + + sqlx::query( + r#" + INSERT INTO v2_clocks (doc_id, timestamp) VALUES ($1, $2) + ON CONFLICT(doc_id) + DO UPDATE SET timestamp=$2;"#, + ) + .bind(doc_id) + .bind(DateTime::from_timestamp_millis(timestamp).unwrap().to_utc()) + .execute(&mut *tx) + .await?; + + tx.commit().await + } + + pub async fn delete_doc(&self, doc_id: String) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("DELETE FROM updates WHERE doc_id = ?;") + .bind(&doc_id) + .execute(&mut *tx) + .await?; + + sqlx::query("DELETE FROM snapshots WHERE doc_id = ?;") + .bind(&doc_id) + .execute(&mut *tx) + .await?; + + sqlx::query("DELETE FROM clocks WHERE doc_id = ?;") + .bind(&doc_id) + .execute(&mut *tx) + .await?; + + tx.commit().await + } + + pub async fn get_doc_clocks(&self, after: Option) -> Result> { + let query = if let Some(after) = after { + sqlx::query("SELECT doc_id, timestamp FROM v2_clocks WHERE timestamp > $1") + .bind(DateTime::from_timestamp_millis(after).unwrap().naive_utc()) + } else { + sqlx::query("SELECT doc_id, timestamp FROM v2_clocks") + }; + + let clocks = query.fetch_all(&self.pool).await?; + + Ok( + clocks + .iter() + .map(|row| DocClock { + doc_id: row.get("doc_id"), + timestamp: row.get("timestamp"), + }) + .collect(), + ) + } +} + +#[cfg(test)] +mod tests { + use chrono::Utc; + use napi::bindgen_prelude::Buffer; + + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + storage.test_migrate().await.unwrap(); + + storage + } + + #[tokio::test] + async fn init_tables() { + let storage = get_storage().await; + + sqlx::query("INSERT INTO v2_snapshots (doc_id, data, updated_at) VALUES ($1, $2, $3);") + .bind("test") + .bind(vec![0, 0]) + .bind(Utc::now()) + .execute(&storage.pool) + .await + .unwrap(); + + sqlx::query_as!( + DocRecord, + "SELECT doc_id, data, updated_at as timestamp FROM v2_snapshots WHERE doc_id = 'test';" + ) + .fetch_one(&storage.pool) + .await + .unwrap(); + } + + #[tokio::test] + async fn push_updates() { + let storage = get_storage().await; + + let updates = vec![vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]]; + + storage + .push_updates("test".to_string(), &updates) + .await + .unwrap(); + + let result = storage.get_doc_updates("test".to_string()).await.unwrap(); + + assert_eq!(result.len(), 4); + assert_eq!( + result.iter().map(|u| u.data.as_ref()).collect::>(), + updates + ); + } + + #[tokio::test] + async fn get_doc_snapshot() { + let storage = get_storage().await; + + let none = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(none.is_none()); + + let snapshot = DocRecord { + doc_id: "test".to_string(), + data: Buffer::from(vec![0, 0]), + timestamp: Utc::now().naive_utc(), + }; + + storage.set_doc_snapshot(snapshot).await.unwrap(); + + let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + } + + #[tokio::test] + async fn set_doc_snapshot() { + let storage = get_storage().await; + + let snapshot = DocRecord { + doc_id: "test".to_string(), + data: Buffer::from(vec![0, 0]), + timestamp: Utc::now().naive_utc(), + }; + + storage.set_doc_snapshot(snapshot).await.unwrap(); + + let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + + let snapshot = DocRecord { + doc_id: "test".to_string(), + data: Buffer::from(vec![0, 1]), + timestamp: DateTime::from_timestamp_millis(Utc::now().timestamp_millis() - 1000) + .unwrap() + .naive_utc(), + }; + + // can't update because it's tempstamp is older + storage.set_doc_snapshot(snapshot).await.unwrap(); + + let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + } + + #[tokio::test] + async fn get_doc_clocks() { + let storage = get_storage().await; + + let clocks = storage.get_doc_clocks(None).await.unwrap(); + + assert_eq!(clocks.len(), 0); + + // where is join_all()? + for i in 1..5u32 { + storage + .push_updates(format!("test_{i}"), vec![vec![0, 0]]) + .await + .unwrap(); + } + + let clocks = storage.get_doc_clocks(None).await.unwrap(); + + assert_eq!(clocks.len(), 4); + assert_eq!( + clocks.iter().map(|c| c.doc_id.as_str()).collect::>(), + vec!["test_1", "test_2", "test_3", "test_4"] + ); + + let clocks = storage + .get_doc_clocks(Some(Utc::now().timestamp_millis())) + .await + .unwrap(); + + assert_eq!(clocks.len(), 0); + } + + #[tokio::test] + async fn mark_updates_merged() { + let storage = get_storage().await; + + storage + .push_updates( + "test".to_string(), + vec![vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]], + ) + .await + .unwrap(); + + let updates = storage.get_doc_updates("test".to_string()).await.unwrap(); + + let result = storage + .mark_updates_merged( + "test".to_string(), + updates + .iter() + .skip(1) + .map(|u| u.created_at) + .collect::>(), + ) + .await + .unwrap(); + + assert_eq!(result, 3); + + let updates = storage.get_doc_updates("test".to_string()).await.unwrap(); + + assert_eq!(updates.len(), 1); + } +} diff --git a/packages/frontend/native/src/sqlite/doc_storage/mod.rs b/packages/frontend/native/src/sqlite/doc_storage/mod.rs index 288ec7f309e7b..209f6674b1260 100644 --- a/packages/frontend/native/src/sqlite/doc_storage/mod.rs +++ b/packages/frontend/native/src/sqlite/doc_storage/mod.rs @@ -1,4 +1,9 @@ +mod blob; +mod doc; mod storage; +mod sync; + +use std::path::PathBuf; use chrono::NaiveDateTime; use napi::bindgen_prelude::{Buffer, Uint8Array}; @@ -28,17 +33,28 @@ pub struct DocClock { pub timestamp: NaiveDateTime, } +#[napi(object)] +pub struct SetBlob { + pub key: String, + pub data: Buffer, + pub mime: String, +} + #[napi(object)] pub struct Blob { pub key: String, pub data: Buffer, pub mime: String, + pub size: i64, + pub created_at: NaiveDateTime, } #[napi(object)] pub struct ListedBlob { pub key: String, pub size: i64, + pub mime: String, + pub created_at: NaiveDateTime, } #[napi] @@ -56,8 +72,20 @@ impl DocStorage { } #[napi] - pub async fn connect(&self) -> napi::Result<()> { - self.storage.connect().await.map_err(map_err) + /// Initialize the database and run migrations. + /// If `migrations` folder is provided, it should be a path to a directory containing SQL migration files. + /// If not, it will to try read migrations under './migrations' related to where this program is running(PWD). + pub async fn init(&self, migrations_folder: Option) -> napi::Result<()> { + self.storage.connect().await.map_err(map_err)?; + self + .storage + .migrate( + migrations_folder + .map(PathBuf::from) + .unwrap_or_else(|| std::env::current_dir().unwrap().join("migrations")), + ) + .await + .map_err(map_err) } #[napi] @@ -139,7 +167,7 @@ impl DocStorage { } #[napi] - pub async fn set_blob(&self, blob: Blob) -> napi::Result<()> { + pub async fn set_blob(&self, blob: SetBlob) -> napi::Result<()> { self.storage.set_blob(blob).await.map_err(map_err) } @@ -161,4 +189,46 @@ impl DocStorage { pub async fn list_blobs(&self) -> napi::Result> { self.storage.list_blobs().await.map_err(map_err) } + + #[napi] + pub async fn get_peer_clocks(&self, peer: String) -> napi::Result> { + self.storage.get_peer_clocks(peer).await.map_err(map_err) + } + + #[napi] + pub async fn set_peer_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> napi::Result<()> { + self + .storage + .set_peer_clock(peer, doc_id, clock) + .await + .map_err(map_err) + } + + #[napi] + pub async fn get_peer_pushed_clocks(&self, peer: String) -> napi::Result> { + self + .storage + .get_peer_pushed_clocks(peer) + .await + .map_err(map_err) + } + + #[napi] + pub async fn set_peer_pushed_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> napi::Result<()> { + self + .storage + .set_peer_pushed_clock(peer, doc_id, clock) + .await + .map_err(map_err) + } } diff --git a/packages/frontend/native/src/sqlite/doc_storage/storage.rs b/packages/frontend/native/src/sqlite/doc_storage/storage.rs index 26661bc18bc67..96771b0ac672d 100644 --- a/packages/frontend/native/src/sqlite/doc_storage/storage.rs +++ b/packages/frontend/native/src/sqlite/doc_storage/storage.rs @@ -1,16 +1,15 @@ -use chrono::{DateTime, NaiveDateTime}; +use std::path::PathBuf; + use sqlx::{ migrate::{MigrateDatabase, Migrator}, sqlite::{Sqlite, SqliteConnectOptions, SqlitePoolOptions}, - ConnectOptions, Pool, QueryBuilder, Row, + ConnectOptions, Pool, }; -use super::{Blob, DocClock, DocRecord, DocUpdate, ListedBlob}; - -type Result = std::result::Result; +pub type Result = std::result::Result; pub struct SqliteDocStorage { - pool: Pool, + pub pool: Pool, path: String, } @@ -45,241 +44,36 @@ impl SqliteDocStorage { Sqlite::create_database(&self.path).await?; }; - let migrations = std::env::current_dir().unwrap().join("migrations"); - let migrator = Migrator::new(migrations).await?; - migrator.run(&self.pool).await?; - Ok(()) } - pub async fn close(&self) { - self.pool.close().await - } - - pub fn is_closed(&self) -> bool { - self.pool.is_closed() - } - - pub async fn push_updates, Updates: AsRef<[Update]>>( - &self, - doc_id: String, - updates: Updates, - ) -> Result { - let mut cnt = 0; - - for chunk in updates.as_ref().chunks(10) { - self.batch_push_updates(&doc_id, chunk).await?; - cnt += chunk.len() as u32; - } - - Ok(cnt) - } - - pub async fn get_doc_snapshot(&self, doc_id: String) -> Result> { - sqlx::query_as!( - DocRecord, - "SELECT doc_id, data, updated_at as timestamp FROM v2_snapshots WHERE doc_id = ?", - doc_id - ) - .fetch_optional(&self.pool) - .await - } - - pub async fn set_doc_snapshot(&self, snapshot: DocRecord) -> Result { - let result = sqlx::query( - r#" - INSERT INTO v2_snapshots (doc_id, data, updated_at) - VALUES ($1, $2, $3) - ON CONFLICT(doc_id) - DO UPDATE SET data=$2, updated_at=$3 - WHERE updated_at <= $3;"#, - ) - .bind(snapshot.doc_id) - .bind(snapshot.data.as_ref()) - .bind(snapshot.timestamp) - .execute(&self.pool) - .await?; - - Ok(result.rows_affected() == 1) - } - - pub async fn get_doc_updates(&self, doc_id: String) -> Result> { - sqlx::query_as!( - DocUpdate, - "SELECT doc_id, created_at, data FROM v2_updates WHERE doc_id = ?", - doc_id - ) - .fetch_all(&self.pool) - .await - } - - pub async fn mark_updates_merged( - &self, - doc_id: String, - updates: Vec, - ) -> Result { - let mut qb = QueryBuilder::new("DELETE FROM v2_updates"); - - qb.push(" WHERE doc_id = "); - qb.push_bind(doc_id); - qb.push(" AND created_at IN ("); - let mut separated = qb.separated(", "); - updates.iter().for_each(|update| { - separated.push_bind(update); - }); - qb.push(");"); - - let query = qb.build(); - - let result = query.execute(&self.pool).await?; - - Ok(result.rows_affected() as u32) - } - - async fn batch_push_updates>( - &self, - doc_id: &str, - updates: &[Update], - ) -> Result<()> { - let mut timestamp = chrono::Utc::now().timestamp_micros(); - - let mut qb = QueryBuilder::new("INSERT INTO v2_updates (doc_id, data, created_at) "); - qb.push_values(updates, |mut b, update| { - timestamp += 1; - b.push_bind(doc_id).push_bind(update.as_ref()).push_bind( - DateTime::from_timestamp_millis(timestamp) - .unwrap() - .naive_utc(), - ); - }); - - let query = qb.build(); - - let mut tx = self.pool.begin().await?; - query.execute(&mut *tx).await?; - - sqlx::query( - r#" - INSERT INTO v2_clocks (doc_id, timestamp) VALUES ($1, $2) - ON CONFLICT(doc_id) - DO UPDATE SET timestamp=$2;"#, - ) - .bind(doc_id) - .bind(DateTime::from_timestamp_millis(timestamp).unwrap().to_utc()) - .execute(&mut *tx) - .await?; - - tx.commit().await - } - - pub async fn delete_doc(&self, doc_id: String) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("DELETE FROM updates WHERE doc_id = ?;") - .bind(&doc_id) - .execute(&mut *tx) - .await?; - - sqlx::query("DELETE FROM snapshots WHERE doc_id = ?;") - .bind(&doc_id) - .execute(&mut *tx) - .await?; - - sqlx::query("DELETE FROM clocks WHERE doc_id = ?;") - .bind(&doc_id) - .execute(&mut *tx) - .await?; - - tx.commit().await - } - - pub async fn get_doc_clocks(&self, after: Option) -> Result> { - let query = if let Some(after) = after { - sqlx::query("SELECT doc_id, timestamp FROM v2_clocks WHERE timestamp > $1") - .bind(DateTime::from_timestamp_millis(after).unwrap().naive_utc()) - } else { - sqlx::query("SELECT doc_id, timestamp FROM v2_clocks") - }; - - let clocks = query.fetch_all(&self.pool).await?; - - Ok( - clocks - .iter() - .map(|row| DocClock { - doc_id: row.get("doc_id"), - timestamp: row.get("timestamp"), - }) - .collect(), - ) - } - - pub async fn get_blob(&self, key: String) -> Result> { - sqlx::query_as!( - Blob, - "SELECT key, data, mime FROM v2_blobs WHERE key = ? AND deleted_at IS NULL", - key - ) - .fetch_optional(&self.pool) - .await - } - - pub async fn set_blob(&self, blob: Blob) -> Result<()> { - sqlx::query( - r#" - INSERT INTO v2_blobs (key, data, mime, size) - VALUES ($1, $2, $3, $4) - ON CONFLICT(key) - DO UPDATE SET data=$2, mime=$3, size=$4, deleted_at=NULL;"#, - ) - .bind(blob.key) - .bind(blob.data.as_ref()) - .bind(blob.mime) - .bind(blob.data.len() as i64) - .execute(&self.pool) - .await?; + #[cfg(test)] + pub async fn test_migrate(&self) -> Result<()> { + let migrator = Migrator::new(std::env::current_dir().unwrap().join("migrations")).await?; + migrator.run(&self.pool).await?; Ok(()) } - pub async fn delete_blob(&self, key: String, permanently: bool) -> Result<()> { - if permanently { - sqlx::query("DELETE FROM v2_blobs WHERE key = ?") - .bind(&key) - .execute(&self.pool) - .await?; - } else { - sqlx::query("UPDATE v2_blobs SET deleted_at = CURRENT_TIMESTAMP WHERE key = ?") - .bind(&key) - .execute(&self.pool) - .await?; - } + pub async fn migrate(&self, migrations: PathBuf) -> Result<()> { + let migrator = Migrator::new(migrations.as_path()).await?; + migrator.run(&self.pool).await?; Ok(()) } - pub async fn release_blobs(&self) -> Result<()> { - sqlx::query("DELETE FROM v2_blobs WHERE deleted_at IS NOT NULL;") - .execute(&self.pool) - .await?; - - Ok(()) + pub async fn close(&self) { + self.pool.close().await } - pub async fn list_blobs(&self) -> Result> { - sqlx::query_as!( - ListedBlob, - "SELECT key, size FROM v2_blobs WHERE deleted_at IS NULL ORDER BY created_at DESC;" - ) - .fetch_all(&self.pool) - .await + pub fn is_closed(&self) -> bool { + self.pool.is_closed() } - /** - * Flush the WAL file to the database file. - * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B - */ - + /// + /// Flush the WAL file to the database file. + /// See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B + /// pub async fn checkpoint(&self) -> Result<()> { sqlx::query("PRAGMA wal_checkpoint(FULL);") .execute(&self.pool) @@ -291,14 +85,12 @@ impl SqliteDocStorage { #[cfg(test)] mod tests { - use chrono::Utc; - use napi::bindgen_prelude::Buffer; - use super::*; async fn get_storage() -> SqliteDocStorage { let storage = SqliteDocStorage::new(":memory:".to_string()); storage.connect().await.unwrap(); + storage.test_migrate().await.unwrap(); storage } @@ -307,296 +99,17 @@ mod tests { async fn init_tables() { let storage = get_storage().await; - sqlx::query("INSERT INTO v2_snapshots (doc_id, data, updated_at) VALUES ($1, $2, $3);") + sqlx::query("INSERT INTO v2_meta (space_id) VALUES ($1);") .bind("test") - .bind(vec![0, 0]) - .bind(Utc::now()) .execute(&storage.pool) .await .unwrap(); - sqlx::query_as!( - DocRecord, - "SELECT doc_id, data, updated_at as timestamp FROM v2_snapshots WHERE doc_id = 'test';" - ) - .fetch_one(&storage.pool) - .await - .unwrap(); - } - - #[tokio::test] - async fn push_updates() { - let storage = get_storage().await; - - let updates = vec![vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]]; - - storage - .push_updates("test".to_string(), &updates) - .await - .unwrap(); - - let result = storage.get_doc_updates("test".to_string()).await.unwrap(); - - assert_eq!(result.len(), 4); - assert_eq!( - result.iter().map(|u| u.data.as_ref()).collect::>(), - updates - ); - } - - #[tokio::test] - async fn get_doc_snapshot() { - let storage = get_storage().await; - - let none = storage.get_doc_snapshot("test".to_string()).await.unwrap(); - - assert!(none.is_none()); - - let snapshot = DocRecord { - doc_id: "test".to_string(), - data: Buffer::from(vec![0, 0]), - timestamp: Utc::now().naive_utc(), - }; - - storage.set_doc_snapshot(snapshot).await.unwrap(); - - let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); - - assert!(result.is_some()); - assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); - } - - #[tokio::test] - async fn set_doc_snapshot() { - let storage = get_storage().await; - - let snapshot = DocRecord { - doc_id: "test".to_string(), - data: Buffer::from(vec![0, 0]), - timestamp: Utc::now().naive_utc(), - }; - - storage.set_doc_snapshot(snapshot).await.unwrap(); - - let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); - - assert!(result.is_some()); - assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); - - let snapshot = DocRecord { - doc_id: "test".to_string(), - data: Buffer::from(vec![0, 1]), - timestamp: DateTime::from_timestamp_millis(Utc::now().timestamp_millis() - 1000) - .unwrap() - .naive_utc(), - }; - - // can't update because it's tempstamp is older - storage.set_doc_snapshot(snapshot).await.unwrap(); - - let result = storage.get_doc_snapshot("test".to_string()).await.unwrap(); - - assert!(result.is_some()); - assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); - } - - #[tokio::test] - async fn get_doc_clocks() { - let storage = get_storage().await; - - let clocks = storage.get_doc_clocks(None).await.unwrap(); - - assert_eq!(clocks.len(), 0); - - // where is join_all()? - for i in 1..5u32 { - storage - .push_updates(format!("test_{i}"), vec![vec![0, 0]]) - .await - .unwrap(); - } - - let clocks = storage.get_doc_clocks(None).await.unwrap(); - - assert_eq!(clocks.len(), 4); - assert_eq!( - clocks.iter().map(|c| c.doc_id.as_str()).collect::>(), - vec!["test_1", "test_2", "test_3", "test_4"] - ); - - let clocks = storage - .get_doc_clocks(Some(Utc::now().timestamp_millis())) - .await - .unwrap(); - - assert_eq!(clocks.len(), 0); - } - - #[tokio::test] - async fn mark_updates_merged() { - let storage = get_storage().await; - - storage - .push_updates( - "test".to_string(), - vec![vec![0, 0], vec![0, 1], vec![1, 0], vec![1, 1]], - ) - .await - .unwrap(); - - let updates = storage.get_doc_updates("test".to_string()).await.unwrap(); - - let result = storage - .mark_updates_merged( - "test".to_string(), - updates - .iter() - .skip(1) - .map(|u| u.created_at) - .collect::>(), - ) - .await - .unwrap(); - - assert_eq!(result, 3); - - let updates = storage.get_doc_updates("test".to_string()).await.unwrap(); - - assert_eq!(updates.len(), 1); - } - - #[tokio::test] - async fn set_blob() { - let storage = get_storage().await; - - let blob = Blob { - key: "test".to_string(), - data: Buffer::from(vec![0, 0]), - mime: "text/plain".to_string(), - }; - - storage.set_blob(blob).await.unwrap(); - - let result = storage.get_blob("test".to_string()).await.unwrap(); - - assert!(result.is_some()); - assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); - } - - #[tokio::test] - async fn delete_blob() { - let storage = get_storage().await; - - for i in 1..5u32 { - storage - .set_blob(Blob { - key: format!("test_{}", i), - data: Buffer::from(vec![0, 0]), - mime: "text/plain".to_string(), - }) - .await - .unwrap(); - } - - let result = storage.get_blob("test_1".to_string()).await.unwrap(); - - assert!(result.is_some()); - - storage - .delete_blob("test_".to_string(), false) - .await - .unwrap(); - - let result = storage.get_blob("test".to_string()).await.unwrap(); - assert!(result.is_none()); - - storage - .delete_blob("test_2".to_string(), true) - .await - .unwrap(); - - let result = storage.get_blob("test".to_string()).await.unwrap(); - assert!(result.is_none()); - } - - #[tokio::test] - async fn list_blobs() { - let storage = get_storage().await; - - let blobs = storage.list_blobs().await.unwrap(); - - assert_eq!(blobs.len(), 0); - - for i in 1..5u32 { - storage - .set_blob(Blob { - key: format!("test_{}", i), - data: Buffer::from(vec![0, 0]), - mime: "text/plain".to_string(), - }) - .await - .unwrap(); - } - - let blobs = storage.list_blobs().await.unwrap(); - - assert_eq!(blobs.len(), 4); - assert_eq!( - blobs.iter().map(|b| b.key.as_str()).collect::>(), - vec!["test_1", "test_2", "test_3", "test_4"] - ); - - storage - .delete_blob("test_2".to_string(), false) - .await - .unwrap(); - - storage - .delete_blob("test_3".to_string(), true) - .await - .unwrap(); - - let query = sqlx::query("SELECT COUNT(*) as len FROM v2_blobs;") - .fetch_one(&storage.pool) - .await - .unwrap(); - - assert_eq!(query.get::("len"), 3); - - let blobs = storage.list_blobs().await.unwrap(); - assert_eq!(blobs.len(), 2); - assert_eq!( - blobs.iter().map(|b| b.key.as_str()).collect::>(), - vec!["test_1", "test_4"] - ); - } - - #[tokio::test] - async fn release_blobs() { - let storage = get_storage().await; - - for i in 1..5u32 { - storage - .set_blob(Blob { - key: format!("test_{}", i), - data: Buffer::from(vec![0, 0]), - mime: "text/plain".to_string(), - }) - .await - .unwrap(); - } - - storage - .delete_blob("test_2".to_string(), false) - .await - .unwrap(); - storage.release_blobs().await.unwrap(); - - let query = sqlx::query("SELECT COUNT(*) as len FROM v2_blobs;") + let record = sqlx::query!("SELECT space_id FROM v2_meta;") .fetch_one(&storage.pool) .await .unwrap(); - assert_eq!(query.get::("len"), 3); + assert_eq!(record.space_id, "test"); } } diff --git a/packages/frontend/native/src/sqlite/doc_storage/sync.rs b/packages/frontend/native/src/sqlite/doc_storage/sync.rs new file mode 100644 index 0000000000000..b12968f9b0b6b --- /dev/null +++ b/packages/frontend/native/src/sqlite/doc_storage/sync.rs @@ -0,0 +1,168 @@ +use chrono::NaiveDateTime; + +use super::storage::{Result, SqliteDocStorage}; +use super::DocClock; + +impl SqliteDocStorage { + pub async fn get_peer_clocks(&self, peer: String) -> Result> { + sqlx::query_as!( + DocClock, + "SELECT doc_id, clock as timestamp FROM v2_peer_clocks WHERE peer = ?", + peer + ) + .fetch_all(&self.pool) + .await + } + + pub async fn set_peer_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO v2_peer_clocks (peer, doc_id, clock, pushed_clock) + VALUES ($1, $2, $3, 0) + ON CONFLICT(peer, doc_id) + DO UPDATE SET clock=$3 WHERE clock < $3;"#, + ) + .bind(peer) + .bind(doc_id) + .bind(clock) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_peer_pushed_clocks(&self, peer: String) -> Result> { + sqlx::query_as!( + DocClock, + "SELECT doc_id, pushed_clock as timestamp FROM v2_peer_clocks WHERE peer = ?", + peer + ) + .fetch_all(&self.pool) + .await + } + + pub async fn set_peer_pushed_clock( + &self, + peer: String, + doc_id: String, + clock: NaiveDateTime, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO v2_peer_clocks (peer, doc_id, clock, pushed_clock) + VALUES ($1, $2, 0, $3) + ON CONFLICT(peer, doc_id) + DO UPDATE SET pushed_clock=$3 WHERE pushed_clock < $3;"#, + ) + .bind(peer) + .bind(doc_id) + .bind(clock) + .execute(&self.pool) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use chrono::{DateTime, Utc}; + use sqlx::Row; + + use super::*; + + async fn get_storage() -> SqliteDocStorage { + let storage = SqliteDocStorage::new(":memory:".to_string()); + storage.connect().await.unwrap(); + storage.test_migrate().await.unwrap(); + + storage + } + + #[tokio::test] + async fn set_peer_clock() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + let clocks = storage.get_peer_clocks(peer.clone()).await.unwrap(); + + assert!(clocks.is_empty()); + + let clock = Utc::now().naive_utc(); + storage + .set_peer_clock(peer.clone(), "doc1".to_string(), clock) + .await + .unwrap(); + + let clocks = storage.get_peer_clocks(peer.clone()).await.unwrap(); + + assert_eq!(clocks.len(), 1); + assert_eq!(clocks.first().unwrap().doc_id, "doc1"); + assert_eq!(clocks.first().unwrap().timestamp, clock); + } + + #[tokio::test] + async fn set_peer_pushed_clock() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + let clocks = storage.get_peer_pushed_clocks(peer.clone()).await.unwrap(); + + assert!(clocks.is_empty()); + + let clock = Utc::now().naive_utc(); + storage + .set_peer_pushed_clock(peer.clone(), "doc1".to_string(), clock) + .await + .unwrap(); + + let clocks = storage.get_peer_pushed_clocks(peer.clone()).await.unwrap(); + + assert_eq!(clocks.len(), 1); + assert_eq!(clocks.first().unwrap().doc_id, "doc1"); + assert_eq!(clocks.first().unwrap().timestamp, clock); + } + + #[tokio::test] + async fn default_clocks() { + let storage = get_storage().await; + let peer = String::from("peer1"); + + storage + .set_peer_clock(peer.clone(), "doc1".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + storage + .set_peer_pushed_clock(peer.clone(), "doc2".to_string(), Utc::now().naive_utc()) + .await + .unwrap(); + + let record = sqlx::query("SELECT * FROM v2_peer_clocks WHERE peer = ? AND doc_id = ?") + .bind(peer.clone()) + .bind("doc1") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!( + record.get::("pushed_clock"), + DateTime::from_timestamp(0, 0).unwrap().naive_utc() + ); + + let record = sqlx::query("SELECT * FROM v2_peer_clocks WHERE peer = ? AND doc_id = ?") + .bind(peer.clone()) + .bind("doc2") + .fetch_one(&storage.pool) + .await + .unwrap(); + assert_eq!( + record.get::("clock"), + DateTime::from_timestamp(0, 0).unwrap().naive_utc() + ); + } +} diff --git a/yarn.lock b/yarn.lock index 297009c8db2f6..94367db44e8a1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -473,6 +473,7 @@ __metadata: version: 0.0.0-use.local resolution: "@affine/doc-storage@workspace:packages/common/doc-storage" dependencies: + "@affine/graphql": "workspace:*" "@affine/native": "workspace:*" "@types/lodash-es": "npm:^4.17.12" idb: "npm:^8.0.0"