Skip to content

Commit

Permalink
chore(doc-storage): organize code
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 25, 2024
1 parent 0fe9f11 commit b160b2b
Show file tree
Hide file tree
Showing 58 changed files with 1,947 additions and 1,114 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- CreateTable
CREATE TABLE "blobs" (
"workspace_id" VARCHAR NOT NULL,
"key" VARCHAR NOT NULL,
"size" INTEGER NOT NULL,
"mime" VARCHAR NOT NULL,
"created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"deleted_at" TIMESTAMPTZ(3),

CONSTRAINT "blobs_pkey" PRIMARY KEY ("workspace_id","key")
);

-- AddForeignKey
ALTER TABLE "blobs" ADD CONSTRAINT "blobs_workspace_id_fkey" FOREIGN KEY ("workspace_id") REFERENCES "workspaces"("id") ON DELETE CASCADE ON UPDATE CASCADE;
20 changes: 19 additions & 1 deletion packages/backend/server/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ model Workspace {
permissions WorkspaceUserPermission[]
pagePermissions WorkspacePageUserPermission[]
features WorkspaceFeature[]
blobs Blob[]
@@map("workspaces")
}
Expand Down Expand Up @@ -335,7 +336,7 @@ model UserSubscription {
// yearly/monthly/lifetime
recurring String @db.VarChar(20)
// onetime subscription or anything else
variant String? @db.VarChar(20)
variant String? @db.VarChar(20)
// subscription.id, null for linefetime payment or one time payment subscription
stripeSubscriptionId String? @unique @map("stripe_subscription_id")
// subscription.status, active/past_due/canceled/unpaid...
Expand Down Expand Up @@ -499,3 +500,20 @@ model RuntimeConfig {
@@unique([module, key])
@@map("app_runtime_settings")
}

// Blob table only exists for fast non-data queries.
// like, total size of blobs in a workspace, or blob list for sync service.
// it should only be a map of metadata of blobs stored anywhere else
model Blob {
workspaceId String @map("workspace_id") @db.VarChar
key String @db.VarChar
size Int @db.Integer
mime String @db.VarChar
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3)
deletedAt DateTime? @map("deleted_at") @db.Timestamptz(3)
workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@id([workspaceId, key])
@@map("blobs")
}
23 changes: 22 additions & 1 deletion packages/backend/server/src/core/quota/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CurrentUser } from '../auth/session';
import { EarlyAccessType } from '../features';
import { UserType } from '../user';
import { QuotaService } from './service';
import { QuotaManagementService } from './storage';

registerEnumType(EarlyAccessType, {
name: 'EarlyAccessType',
Expand Down Expand Up @@ -55,14 +56,34 @@ class UserQuotaType {
humanReadable!: UserQuotaHumanReadableType;
}

@ObjectType('UserQuotaUsage')
class UserQuotaUsageType {
@Field(() => SafeIntResolver, { name: 'storageQuota' })
storageQuota!: number;
}

@Resolver(() => UserType)
export class QuotaManagementResolver {
constructor(private readonly quota: QuotaService) {}
constructor(
private readonly quota: QuotaService,
private readonly management: QuotaManagementService
) {}

@ResolveField(() => UserQuotaType, { name: 'quota', nullable: true })
async getQuota(@CurrentUser() me: UserType) {
const quota = await this.quota.getUserQuota(me.id);

return quota.feature;
}

@ResolveField(() => UserQuotaUsageType, { name: 'quotaUsage' })
async getQuotaUsage(
@CurrentUser() me: UserType
): Promise<UserQuotaUsageType> {
const usage = await this.management.getUserStorageUsage(me.id);

return {
storageQuota: usage,
};
}
}
6 changes: 3 additions & 3 deletions packages/backend/server/src/core/quota/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class QuotaManagementService {
};
}

async getUserUsage(userId: string) {
async getUserStorageUsage(userId: string) {
const workspaces = await this.permissions.getOwnedWorkspaces(userId);

const sizes = await Promise.allSettled(
Expand Down Expand Up @@ -88,7 +88,7 @@ export class QuotaManagementService {
async getQuotaCalculator(userId: string) {
const quota = await this.getUserQuota(userId);
const { storageQuota, businessBlobLimit } = quota;
const usedSize = await this.getUserUsage(userId);
const usedSize = await this.getUserStorageUsage(userId);

return this.generateQuotaCalculator(
storageQuota,
Expand Down Expand Up @@ -128,7 +128,7 @@ export class QuotaManagementService {
},
} = await this.quota.getUserQuota(owner.id);
// get all workspaces size of owner used
const usedSize = await this.getUserUsage(owner.id);
const usedSize = await this.getUserStorageUsage(owner.id);
// relax restrictions if workspace has unlimited feature
// todo(@darkskygit): need a mechanism to allow feature as a middleware to edit quota
const unlimited = await this.feature.hasWorkspaceFeature(
Expand Down
176 changes: 142 additions & 34 deletions packages/backend/server/src/core/storage/wrappers/blob.ts
Original file line number Diff line number Diff line change
@@ -1,75 +1,177 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';

import {
type BlobInputType,
Cache,
Config,
EventEmitter,
type EventPayload,
type ListObjectsMetadata,
type GetObjectMetadata,
ListObjectsMetadata,
OnEvent,
type StorageProvider,
StorageProviderFactory,
} from '../../../fundamentals';

@Injectable()
export class WorkspaceBlobStorage {
private readonly logger = new Logger(WorkspaceBlobStorage.name);
public readonly provider: StorageProvider;

constructor(
private readonly config: Config,
private readonly event: EventEmitter,
private readonly storageFactory: StorageProviderFactory,
private readonly cache: Cache
private readonly db: PrismaClient
) {
this.provider = this.storageFactory.create(this.config.storages.blob);
}

async put(workspaceId: string, key: string, blob: BlobInputType) {
await this.provider.put(`${workspaceId}/${key}`, blob);
await this.cache.delete(`blob-list:${workspaceId}`);
async put(workspaceId: string, key: string, blob: Buffer, mime: string) {
const meta: GetObjectMetadata = {
contentType: mime,
contentLength: blob.byteLength,
lastModified: new Date(),
};
await this.provider.put(`${workspaceId}/${key}`, blob, meta);
this.trySyncBlobMeta(workspaceId, key, meta);
}

async get(workspaceId: string, key: string) {
return this.provider.get(`${workspaceId}/${key}`);
const blob = await this.provider.get(`${workspaceId}/${key}`);
this.trySyncBlobMeta(workspaceId, key, blob.metadata);
return blob;
}

async list(workspaceId: string) {
const cachedList = await this.cache.list<ListObjectsMetadata>(
`blob-list:${workspaceId}`,
0,
-1
);

if (cachedList.length > 0) {
return cachedList;
const blobsInDb = await this.db.blob.findMany({
where: {
workspaceId,
deletedAt: null,
},
});

if (blobsInDb.length > 0) {
return blobsInDb;
}

const blobs = await this.provider.list(workspaceId + '/');
this.trySyncBlobsMeta(workspaceId, blobs);

return blobs.map(blob => ({
key: blob.key,
size: blob.contentLength,
createdAt: blob.lastModified,
mime: 'application/octet-stream',
}));
}

blobs.forEach(item => {
// trim workspace prefix
item.key = item.key.slice(workspaceId.length + 1);
});

await this.cache.pushBack(`blob-list:${workspaceId}`, ...blobs);

return blobs;
async delete(workspaceId: string, key: string, permanently = false) {
if (permanently) {
await this.provider.delete(`${workspaceId}/${key}`);
await this.db.blob.deleteMany({
where: {
workspaceId,
key,
},
});
} else {
await this.db.blob.update({
where: {
workspaceId_key: {
workspaceId,
key,
},
},
data: {
deletedAt: new Date(),
},
});
}
}

/**
* we won't really delete the blobs until the doc blobs manager is implemented sounded
*/
async delete(_workspaceId: string, _key: string) {
// return this.provider.delete(`${workspaceId}/${key}`);
async release(workspaceId: string) {
const deletedBlobs = await this.db.blob.findMany({
where: {
workspaceId,
deletedAt: {
not: null,
},
},
});

deletedBlobs.forEach(blob => {
this.event.emit('workspace.blob.deleted', {
workspaceId: workspaceId,
key: blob.key,
});
});
}

async totalSize(workspaceId: string) {
const blobs = await this.list(workspaceId);
// how could we ignore the ones get soft-deleted?
return blobs.reduce((acc, item) => acc + item.size, 0);
}

private trySyncBlobsMeta(workspaceId: string, blobs: ListObjectsMetadata[]) {
for (const blob of blobs) {
this.trySyncBlobMeta(workspaceId, blob.key, {
contentType: 'application/octet-stream',
...blob,
});
}
}

private trySyncBlobMeta(
workspaceId: string,
key: string,
meta?: GetObjectMetadata
) {
setImmediate(() => {
this.syncBlobMeta(workspaceId, key, meta).catch(() => {
/* never throw */
});
});
}

private async syncBlobMeta(
workspaceId: string,
key: string,
meta?: GetObjectMetadata
) {
try {
if (meta) {
await this.db.blob.upsert({
where: {
workspaceId_key: {
workspaceId,
key,
},
},
update: {
mime: meta.contentType,
size: meta.contentLength,
},
create: {
workspaceId,
key,
mime: meta.contentType,
size: meta.contentLength,
},
});
} else {
await this.db.blob.deleteMany({
where: {
workspaceId,
key,
},
});
}
} catch (e) {
// never throw
this.logger.error('failed to sync blob meta to DB', e);
}
}

@OnEvent('workspace.deleted')
async onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) {
const blobs = await this.list(workspaceId);
Expand All @@ -78,16 +180,22 @@ export class WorkspaceBlobStorage {
blobs.forEach(blob => {
this.event.emit('workspace.blob.deleted', {
workspaceId: workspaceId,
name: blob.key,
key: blob.key,
});
});
}

@OnEvent('workspace.blob.deleted')
async onDeleteWorkspaceBlob({
workspaceId,
name,
key,
}: EventPayload<'workspace.blob.deleted'>) {
await this.delete(workspaceId, name);
await this.db.blob.deleteMany({
where: {
workspaceId,
key,
},
});
await this.delete(workspaceId, key);
}
}
21 changes: 1 addition & 20 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ interface UpdateAwarenessMessage {
docId: string;
awarenessUpdate: string;
}

@WebSocketGateway()
export class SpaceSyncGateway
implements OnGatewayConnection, OnGatewayDisconnect
Expand Down Expand Up @@ -181,26 +182,6 @@ export class SpaceSyncGateway
}
}

async joinWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.join(room);
}

async leaveWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.leave(room);
}

assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
if (!client.rooms.has(room)) {
throw new NotInSpace({ spaceId: room.split(':')[0] });
}
}

// v3
@SubscribeMessage('space:join')
async onJoinSpace(
Expand Down
Loading

0 comments on commit b160b2b

Please sign in to comment.