diff --git a/packages/common/doc-storage/src/impls/cloud/blob.ts b/packages/common/doc-storage/src/impls/cloud/blob.ts new file mode 100644 index 0000000000000..0abe7594b70c3 --- /dev/null +++ b/packages/common/doc-storage/src/impls/cloud/blob.ts @@ -0,0 +1,140 @@ +import { + type Blob, + BlobStorage, + type DocStorageOptions, + type ListedBlob, +} from '../../storage'; +import type { Socket } from './socket'; + +interface CloudBlobStorageOptions extends DocStorageOptions { + socket: Socket; +} + +export class CloudBlobStorage extends BlobStorage { + get socket() { + return this.options.socket; + } + + override async connect(): Promise { + // the event will be polled, there is no need to wait for socket to be connected + await this.clientHandShake(); + // this.socket.on('space:broadcast-blob-update', this.onServerUpdates); + } + + private async clientHandShake() { + const res = await this.socket.emitWithAck('space:join', { + spaceType: this.spaceType, + spaceId: this.spaceId, + clientVersion: BUILD_CONFIG.appVersion, + }); + + if ('error' in res) { + // TODO(@forehalo): use [UserFriendlyError] + throw new Error(res.error.message); + } + } + + override async disconnect(): Promise { + this.socket.emit('space:leave', { + spaceType: this.spaceType, + spaceId: this.spaceId, + }); + // this.socket.off('space:broadcast-doc-updates', this.onServerUpdate); + } + + // onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => { + // if ( + // this.spaceType === message.spaceType && + // this.spaceId === message.spaceId + // ) { + // // how do we deal with the data? + // } + // }; + + override async getBlob(key: string): Promise { + const res = await this.socket.emitWithAck('space:get-blob', { + spaceType: this.spaceType, + spaceId: this.spaceId, + key, + }); + + if ('error' in res) { + // TODO: use [UserFriendlyError] + throw new Error(res.error.message); + } + + return { + ...res.data, + data: base64ToUint8Array(res.data.data), + }; + } + + override async setBlob(blob: Blob): Promise { + this.socket.emit('space:set-blob', { + spaceType: this.spaceType, + spaceId: this.spaceId, + key: blob.key, + data: await uint8ArrayToBase64(blob.data), + mime: blob.mime, + }); + } + + override async deleteBlob(key: string, permanently: boolean): Promise { + this.socket.emit('space:delete-blob', { + spaceType: this.spaceType, + spaceId: this.spaceId, + key, + permanently, + }); + } + + override async releaseBlobs(): Promise { + this.socket.emit('space:release-blobs', { + spaceType: this.spaceType, + spaceId: this.spaceId, + }); + } + + override async listBlobs(): Promise { + const res = await this.socket.emitWithAck('space:list-blobs', { + spaceType: this.spaceType, + spaceId: this.spaceId, + }); + + if ('error' in res) { + // TODO: use [UserFriendlyError] + throw new Error(res.error.message); + } + + return res.data; + } +} + +export function uint8ArrayToBase64(array: Uint8Array): Promise { + return new Promise(resolve => { + // Create a blob from the Uint8Array + const blob = new Blob([array]); + + const reader = new FileReader(); + reader.onload = function () { + const dataUrl = reader.result as string | null; + if (!dataUrl) { + resolve(''); + return; + } + // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data + const base64 = dataUrl.split(',')[1]; + resolve(base64); + }; + + reader.readAsDataURL(blob); + }); +} + +export function base64ToUint8Array(base64: string) { + const binaryString = atob(base64); + const binaryArray = binaryString.split('').map(function (char) { + return char.charCodeAt(0); + }); + return new Uint8Array(binaryArray); +} diff --git a/packages/common/doc-storage/src/impls/cloud/doc.ts b/packages/common/doc-storage/src/impls/cloud/doc.ts index 232ddb8b81cbc..6708090a5f2e3 100644 --- a/packages/common/doc-storage/src/impls/cloud/doc.ts +++ b/packages/common/doc-storage/src/impls/cloud/doc.ts @@ -1,85 +1,11 @@ -import type { Socket } from 'socket.io-client'; - import { DocStorage, type DocStorageOptions } from '../../storage'; - -// TODO(@forehalo): use [UserFriendlyError] -interface EventError { - name: string; - message: string; -} - -type WebsocketResponse = - | { - error: EventError; - } - | { - data: T; - }; - -interface ServerEvents { - 'space:broadcast-doc-updates': { - spaceType: string; - spaceId: string; - docId: string; - updates: string[]; - timestamp: number; - }; -} - -interface ClientEvents { - 'space:join': [ - { spaceType: string; spaceId: string; clientVersion: string }, - { clientId: string }, - ]; - 'space:leave': { spaceType: string; spaceId: string }; - 'space:push-doc-updates': [ - { spaceType: string; spaceId: string; docId: string; updates: string[] }, - { timestamp: number }, - ]; - 'space:load-doc-timestamps': [ - { - spaceType: string; - spaceId: string; - timestamp?: number; - }, - Record, - ]; - 'space:load-doc': [ - { - spaceType: string; - spaceId: string; - docId: string; - stateVector?: string; - }, - { - missing: string; - state: string; - timestamp: number; - }, - ]; -} - -type ServerEventsMap = { - [Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void; -}; -type ClientEventsMap = { - [Key in keyof ClientEvents]: ClientEvents[Key] extends Array - ? ( - data: ClientEvents[Key][0], - ack: (res: WebsocketResponse) => void - ) => void - : (data: ClientEvents[Key]) => void; -}; +import type { ServerEventsMap, Socket } from './socket'; interface CloudDocStorageOptions extends DocStorageOptions { - socket: Socket; + socket: Socket; } export class CloudDocStorage extends DocStorage { - constructor(options: CloudDocStorageOptions) { - super(options); - } - get name() { // @ts-expect-error we need it return this.options.socket.io.uri; diff --git a/packages/common/doc-storage/src/impls/cloud/socket.ts b/packages/common/doc-storage/src/impls/cloud/socket.ts new file mode 100644 index 0000000000000..7169728610317 --- /dev/null +++ b/packages/common/doc-storage/src/impls/cloud/socket.ts @@ -0,0 +1,118 @@ +import type { Socket as IO } from 'socket.io-client'; + +// TODO(@forehalo): use [UserFriendlyError] +interface EventError { + name: string; + message: string; +} + +type WebsocketResponse = + | { + error: EventError; + } + | { + data: T; + }; + +interface ServerEvents { + 'space:broadcast-doc-updates': { + spaceType: string; + spaceId: string; + docId: string; + updates: string[]; + timestamp: number; + }; + 'space:broadcast-blob-update': { + spaceType: string; + spaceId: string; + key: string; + data: string; + mime: string; + }; +} + +interface ClientEvents { + 'space:join': [ + { spaceType: string; spaceId: string; clientVersion: string }, + { clientId: string }, + ]; + 'space:leave': { spaceType: string; spaceId: string }; + 'space:push-doc-updates': [ + { spaceType: string; spaceId: string; docId: string; updates: string[] }, + { timestamp: number }, + ]; + 'space:load-doc-timestamps': [ + { + spaceType: string; + spaceId: string; + timestamp?: number; + }, + Record, + ]; + 'space:load-doc': [ + { + spaceType: string; + spaceId: string; + docId: string; + stateVector?: string; + }, + { + missing: string; + state: string; + timestamp: number; + }, + ]; + + // blobs + 'space:get-blob': [ + { + spaceType: string; + spaceId: string; + key: string; + }, + { + key: string; + data: string; + mime: string; + }, + ]; + 'space:set-blob': { + spaceType: string; + spaceId: string; + key: string; + data: string; + mime: string; + }; + 'space:delete-blob': { + spaceType: string; + spaceId: string; + key: string; + permanently: boolean; + }; + 'space:release-blobs': { + spaceType: string; + spaceId: string; + }; + 'space:list-blobs': [ + { + spaceType: string; + spaceId: string; + }, + { key: string; size: number }[], + ]; +} + +export type ServerEventsMap = { + [Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void; +}; + +export type ClientEventsMap = { + [Key in keyof ClientEvents]: ClientEvents[Key] extends Array + ? ( + data: ClientEvents[Key][0], + ack: (res: WebsocketResponse) => void + ) => void + : (data: ClientEvents[Key]) => void; +}; + +export type Socket = IO; diff --git a/packages/common/doc-storage/src/impls/idb/blob.ts b/packages/common/doc-storage/src/impls/idb/blob.ts new file mode 100644 index 0000000000000..4d03dd78e82d7 --- /dev/null +++ b/packages/common/doc-storage/src/impls/idb/blob.ts @@ -0,0 +1,74 @@ +import { type Blob, BlobStorage, type ListedBlob } from '../../storage'; +import { type SpaceIDB, SpaceIndexedDbManager } from './db'; + +export class IndexedDBBlobStorage extends BlobStorage { + private db!: SpaceIDB; + + override async connect(): Promise { + this.db = await SpaceIndexedDbManager.open( + `${this.spaceType}:${this.spaceId}` + ); + } + + override async getBlob(key: string): Promise { + const trx = this.db.transaction('blobs', 'readonly'); + const blob = await trx.store.get(key); + + if (!blob || blob.deletedAt) { + return null; + } + + return blob; + } + + override async setBlob(blob: Blob): Promise { + const trx = this.db.transaction('blobs', 'readwrite'); + await trx.store.put({ + ...blob, + size: blob.data.length, + createdAt: new Date(), + deletedAt: null, + }); + } + + override async deleteBlob(key: string, permanently = false): Promise { + const trx = this.db.transaction('blobs', 'readwrite'); + if (permanently) { + await trx.store.delete(key); + } else { + const blob = await trx.store.get(key); + if (blob) { + await trx.store.put({ + ...blob, + deletedAt: new Date(), + }); + } + } + } + + override async releaseBlobs(): Promise { + const trx = this.db.transaction('blobs', 'readwrite'); + + const it = trx.store.iterate(); + + for await (const item of it) { + if (item.value.deletedAt) { + await item.delete(); + } + } + } + + override async listBlobs(): Promise { + const trx = this.db.transaction('blobs', 'readonly'); + const it = trx.store.iterate(); + + const blobs: ListedBlob[] = []; + for await (const item of it) { + if (!item.value.deletedAt) { + blobs.push({ key: item.value.key, size: item.value.size }); + } + } + + return blobs; + } +} diff --git a/packages/common/doc-storage/src/impls/idb/schema.ts b/packages/common/doc-storage/src/impls/idb/schema.ts index 075e8018ca98a..8e2a1e7d45015 100644 --- a/packages/common/doc-storage/src/impls/idb/schema.ts +++ b/packages/common/doc-storage/src/impls/idb/schema.ts @@ -20,6 +20,11 @@ Table(Clocks) | docId | clock | |-------|--------| | str | number | + +Table(Blobs) +| key | data | mime | size | createdAt | deletedAt | +|-----|------|------|------|-----------|-----------| +| str | bin | str | num | Date | Date | */ export interface DocStorageSchema extends DBSchema { snapshots: { @@ -55,15 +60,15 @@ export interface DocStorageSchema extends DBSchema { timestamp: number; }; }; - peerClocks: { - key: [string, string]; + blobs: { + key: string; value: { - docId: string; - peerId: string; - clock: number; - }; - indexes: { - clock: number; + key: string; + data: Uint8Array; + mime: string; + size: number; + createdAt: Date; + deletedAt: Date | null; }; }; } @@ -110,6 +115,11 @@ const init: Migrate = db => { }); clocks.createIndex('timestamp', 'timestamp', { unique: false }); + + db.createObjectStore('blobs', { + keyPath: 'key', + autoIncrement: false, + }); }; // END REGION diff --git a/packages/common/doc-storage/src/impls/sqlite/blob.ts b/packages/common/doc-storage/src/impls/sqlite/blob.ts new file mode 100644 index 0000000000000..a625a7f89f291 --- /dev/null +++ b/packages/common/doc-storage/src/impls/sqlite/blob.ts @@ -0,0 +1,37 @@ +import type { DocStorage as NativeDocStorage } from '@affine/native'; + +import { + type Blob, + BlobStorage, + type BlobStorageOptions, + type ListedBlob, +} from '../../storage'; + +interface SqliteBlobStorageOptions extends BlobStorageOptions { + db: NativeDocStorage; +} + +export class SqliteBlobStorage extends BlobStorage { + get db() { + return this.options.db; + } + + override getBlob(key: string): Promise { + return this.db.getBlob(key); + } + override setBlob(blob: Blob): Promise { + return this.db.setBlob({ + ...blob, + data: Buffer.from(blob.data), + }); + } + override deleteBlob(key: string, permanently: boolean): Promise { + return this.db.deleteBlob(key, permanently); + } + override releaseBlobs(): Promise { + return this.db.releaseBlobs(); + } + override listBlobs(): Promise { + return this.db.listBlobs(); + } +} diff --git a/packages/common/doc-storage/src/storage/blob.ts b/packages/common/doc-storage/src/storage/blob.ts new file mode 100644 index 0000000000000..a5d6559b4b859 --- /dev/null +++ b/packages/common/doc-storage/src/storage/blob.ts @@ -0,0 +1,42 @@ +import { Connection } from './connection'; + +export interface BlobStorageOptions { + spaceType: string; + spaceId: string; +} + +export interface Blob { + key: string; + data: Uint8Array; + mime: string; +} + +export interface ListedBlob { + key: string; + size: number; +} + +export abstract class BlobStorage< + Options extends BlobStorageOptions = BlobStorageOptions, +> extends Connection { + public readonly options: Options; + + constructor(opts: Options) { + super(); + this.options = opts; + } + + get spaceType() { + return this.options.spaceType; + } + + get spaceId() { + return this.options.spaceId; + } + + abstract getBlob(key: string): Promise; + abstract setBlob(blob: Blob): Promise; + abstract deleteBlob(key: string, permanently: boolean): Promise; + abstract releaseBlobs(): Promise; + abstract listBlobs(/* pagination? */): Promise; +} diff --git a/packages/common/doc-storage/src/storage/index.ts b/packages/common/doc-storage/src/storage/index.ts index f42f6dd754a91..d476ae6eb9b92 100644 --- a/packages/common/doc-storage/src/storage/index.ts +++ b/packages/common/doc-storage/src/storage/index.ts @@ -1 +1,2 @@ +export * from './blob'; export * from './doc'; diff --git a/packages/frontend/native/index.d.ts b/packages/frontend/native/index.d.ts index ff6f82e16659c..bff1c745015f6 100644 --- a/packages/frontend/native/index.d.ts +++ b/packages/frontend/native/index.d.ts @@ -5,6 +5,11 @@ export declare class DocStorage { connect(): Promise close(): Promise get isClosed(): Promise + /** + * Flush the WAL file to the database file. + * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B + */ + checkpoint(): Promise pushUpdates(docId: string, updates: Array): Promise getDocSnapshot(docId: string): Promise setDocSnapshot(snapshot: DocRecord): Promise @@ -12,11 +17,11 @@ export declare class DocStorage { markUpdatesMerged(docId: string, updates: Array): Promise deleteDoc(docId: string): Promise getDocClocks(after?: number | undefined | null): Promise> - /** - * Flush the WAL file to the database file. - * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B - */ - checkpoint(): Promise + getBlob(key: string): Promise + setBlob(blob: Blob): Promise + deleteBlob(key: string, permanently: boolean): Promise + releaseBlobs(): Promise + listBlobs(): Promise> } export declare class SqliteConnection { @@ -56,6 +61,12 @@ export declare class SqliteConnection { checkpoint(): Promise } +export interface Blob { + key: string + data: Buffer + mime: string +} + export interface BlobRow { key: string data: Buffer @@ -84,6 +95,11 @@ export interface InsertRow { data: Uint8Array } +export interface ListedBlob { + key: string + size: number +} + export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise export interface UpdateRow { diff --git a/packages/frontend/native/migrations/20240929082254_init.sql b/packages/frontend/native/migrations/20240929082254_init.sql index 054f51b4abc04..619cb49b5044e 100644 --- a/packages/frontend/native/migrations/20240929082254_init.sql +++ b/packages/frontend/native/migrations/20240929082254_init.sql @@ -17,4 +17,13 @@ CREATE INDEX updates_doc_id ON v2_updates (doc_id); CREATE TABLE "v2_clocks" ( doc_id TEXT PRIMARY KEY NOT NULL, timestamp TIMESTAMP NOT NULL -) \ No newline at end of file +); + +CREATE TABLE "v2_blobs" ( + key VARCHAR PRIMARY KEY NOT NULL, + data BLOB NOT NULL, + mime VARCHAR NOT NULL, + size INTEGER NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + deleted_at TIMESTAMP +); \ No newline at end of file diff --git a/packages/frontend/native/src/sqlite/doc_storage/mod.rs b/packages/frontend/native/src/sqlite/doc_storage/mod.rs index 7db36e95462e0..288ec7f309e7b 100644 --- a/packages/frontend/native/src/sqlite/doc_storage/mod.rs +++ b/packages/frontend/native/src/sqlite/doc_storage/mod.rs @@ -28,6 +28,19 @@ pub struct DocClock { pub timestamp: NaiveDateTime, } +#[napi(object)] +pub struct Blob { + pub key: String, + pub data: Buffer, + pub mime: String, +} + +#[napi(object)] +pub struct ListedBlob { + pub key: String, + pub size: i64, +} + #[napi] pub struct DocStorage { storage: storage::SqliteDocStorage, @@ -59,6 +72,15 @@ impl DocStorage { Ok(self.storage.is_closed()) } + /** + * Flush the WAL file to the database file. + * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B + */ + #[napi] + pub async fn checkpoint(&self) -> napi::Result<()> { + self.storage.checkpoint().await.map_err(map_err) + } + #[napi] pub async fn push_updates(&self, doc_id: String, updates: Vec) -> napi::Result { let updates = updates.iter().map(|u| u.as_ref()).collect::>(); @@ -111,12 +133,32 @@ impl DocStorage { self.storage.get_doc_clocks(after).await.map_err(map_err) } - /** - * Flush the WAL file to the database file. - * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B - */ #[napi] - pub async fn checkpoint(&self) -> napi::Result<()> { - self.storage.checkpoint().await.map_err(map_err) + pub async fn get_blob(&self, key: String) -> napi::Result> { + self.storage.get_blob(key).await.map_err(map_err) + } + + #[napi] + pub async fn set_blob(&self, blob: Blob) -> napi::Result<()> { + self.storage.set_blob(blob).await.map_err(map_err) + } + + #[napi] + pub async fn delete_blob(&self, key: String, permanently: bool) -> napi::Result<()> { + self + .storage + .delete_blob(key, permanently) + .await + .map_err(map_err) + } + + #[napi] + pub async fn release_blobs(&self) -> napi::Result<()> { + self.storage.release_blobs().await.map_err(map_err) + } + + #[napi] + pub async fn list_blobs(&self) -> napi::Result> { + self.storage.list_blobs().await.map_err(map_err) } } diff --git a/packages/frontend/native/src/sqlite/doc_storage/storage.rs b/packages/frontend/native/src/sqlite/doc_storage/storage.rs index c56f17af20313..26661bc18bc67 100644 --- a/packages/frontend/native/src/sqlite/doc_storage/storage.rs +++ b/packages/frontend/native/src/sqlite/doc_storage/storage.rs @@ -5,7 +5,7 @@ use sqlx::{ ConnectOptions, Pool, QueryBuilder, Row, }; -use super::{DocClock, DocRecord, DocUpdate}; +use super::{Blob, DocClock, DocRecord, DocUpdate, ListedBlob}; type Result = std::result::Result; @@ -214,6 +214,67 @@ impl SqliteDocStorage { ) } + pub async fn get_blob(&self, key: String) -> Result> { + sqlx::query_as!( + Blob, + "SELECT key, data, mime FROM v2_blobs WHERE key = ? AND deleted_at IS NULL", + key + ) + .fetch_optional(&self.pool) + .await + } + + pub async fn set_blob(&self, blob: Blob) -> Result<()> { + sqlx::query( + r#" + INSERT INTO v2_blobs (key, data, mime, size) + VALUES ($1, $2, $3, $4) + ON CONFLICT(key) + DO UPDATE SET data=$2, mime=$3, size=$4, deleted_at=NULL;"#, + ) + .bind(blob.key) + .bind(blob.data.as_ref()) + .bind(blob.mime) + .bind(blob.data.len() as i64) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn delete_blob(&self, key: String, permanently: bool) -> Result<()> { + if permanently { + sqlx::query("DELETE FROM v2_blobs WHERE key = ?") + .bind(&key) + .execute(&self.pool) + .await?; + } else { + sqlx::query("UPDATE v2_blobs SET deleted_at = CURRENT_TIMESTAMP WHERE key = ?") + .bind(&key) + .execute(&self.pool) + .await?; + } + + Ok(()) + } + + pub async fn release_blobs(&self) -> Result<()> { + sqlx::query("DELETE FROM v2_blobs WHERE deleted_at IS NOT NULL;") + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn list_blobs(&self) -> Result> { + sqlx::query_as!( + ListedBlob, + "SELECT key, size FROM v2_blobs WHERE deleted_at IS NULL ORDER BY created_at DESC;" + ) + .fetch_all(&self.pool) + .await + } + /** * Flush the WAL file to the database file. * See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B @@ -403,4 +464,139 @@ mod tests { assert_eq!(updates.len(), 1); } + + #[tokio::test] + async fn set_blob() { + let storage = get_storage().await; + + let blob = Blob { + key: "test".to_string(), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }; + + storage.set_blob(blob).await.unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + + assert!(result.is_some()); + assert_eq!(result.unwrap().data.as_ref(), vec![0, 0]); + } + + #[tokio::test] + async fn delete_blob() { + let storage = get_storage().await; + + for i in 1..5u32 { + storage + .set_blob(Blob { + key: format!("test_{}", i), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + let result = storage.get_blob("test_1".to_string()).await.unwrap(); + + assert!(result.is_some()); + + storage + .delete_blob("test_".to_string(), false) + .await + .unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + assert!(result.is_none()); + + storage + .delete_blob("test_2".to_string(), true) + .await + .unwrap(); + + let result = storage.get_blob("test".to_string()).await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn list_blobs() { + let storage = get_storage().await; + + let blobs = storage.list_blobs().await.unwrap(); + + assert_eq!(blobs.len(), 0); + + for i in 1..5u32 { + storage + .set_blob(Blob { + key: format!("test_{}", i), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + let blobs = storage.list_blobs().await.unwrap(); + + assert_eq!(blobs.len(), 4); + assert_eq!( + blobs.iter().map(|b| b.key.as_str()).collect::>(), + vec!["test_1", "test_2", "test_3", "test_4"] + ); + + storage + .delete_blob("test_2".to_string(), false) + .await + .unwrap(); + + storage + .delete_blob("test_3".to_string(), true) + .await + .unwrap(); + + let query = sqlx::query("SELECT COUNT(*) as len FROM v2_blobs;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(query.get::("len"), 3); + + let blobs = storage.list_blobs().await.unwrap(); + assert_eq!(blobs.len(), 2); + assert_eq!( + blobs.iter().map(|b| b.key.as_str()).collect::>(), + vec!["test_1", "test_4"] + ); + } + + #[tokio::test] + async fn release_blobs() { + let storage = get_storage().await; + + for i in 1..5u32 { + storage + .set_blob(Blob { + key: format!("test_{}", i), + data: Buffer::from(vec![0, 0]), + mime: "text/plain".to_string(), + }) + .await + .unwrap(); + } + + storage + .delete_blob("test_2".to_string(), false) + .await + .unwrap(); + storage.release_blobs().await.unwrap(); + + let query = sqlx::query("SELECT COUNT(*) as len FROM v2_blobs;") + .fetch_one(&storage.pool) + .await + .unwrap(); + + assert_eq!(query.get::("len"), 3); + } }