Skip to content

Commit

Permalink
feat(doc-storage): impl blob storages
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 17, 2024
1 parent 9a6c896 commit 5355e89
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 21 deletions.
74 changes: 74 additions & 0 deletions packages/common/doc-storage/src/impls/idb/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { type Blob, BlobStorage, type ListedBlob } from '../../storage';
import { type SpaceIDB, SpaceIndexedDbManager } from './db';

export class IndexedDBBlobStorage extends BlobStorage {
private db!: SpaceIDB;

override async connect(): Promise<void> {
this.db = await SpaceIndexedDbManager.open(
`${this.spaceType}:${this.spaceId}`
);
}

override async getBlob(key: string): Promise<Blob | null> {
const trx = this.db.transaction('blobs', 'readonly');
const blob = await trx.store.get(key);

if (!blob || blob.deletedAt) {
return null;
}

return blob;
}

override async setBlob(blob: Blob): Promise<void> {
const trx = this.db.transaction('blobs', 'readwrite');
await trx.store.put({
...blob,
size: blob.data.length,
createdAt: new Date(),
deletedAt: null,
});
}

override async deleteBlob(key: string, permanently = false): Promise<void> {
const trx = this.db.transaction('blobs', 'readwrite');
if (permanently) {
await trx.store.delete(key);
} else {
const blob = await trx.store.get(key);
if (blob) {
await trx.store.put({
...blob,
deletedAt: new Date(),
});
}
}
}

override async releaseBlobs(): Promise<void> {
const trx = this.db.transaction('blobs', 'readwrite');

const it = trx.store.iterate();

for await (const item of it) {
if (item.value.deletedAt) {
await item.delete();
}
}
}

override async listBlobs(): Promise<ListedBlob[]> {
const trx = this.db.transaction('blobs', 'readonly');
const it = trx.store.iterate();

const blobs: ListedBlob[] = [];
for await (const item of it) {
if (!item.value.deletedAt) {
blobs.push({ key: item.value.key, size: item.value.size });
}
}

return blobs;
}
}
26 changes: 18 additions & 8 deletions packages/common/doc-storage/src/impls/idb/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Table(Clocks)
| docId | clock |
|-------|--------|
| str | number |
Table(Blobs)
| key | data | mime | size | createdAt | deletedAt |
|-----|------|------|------|-----------|-----------|
| str | bin | str | num | Date | Date |
*/
export interface DocStorageSchema extends DBSchema {
snapshots: {
Expand Down Expand Up @@ -55,15 +60,15 @@ export interface DocStorageSchema extends DBSchema {
timestamp: number;
};
};
peerClocks: {
key: [string, string];
blobs: {
key: string;
value: {
docId: string;
peerId: string;
clock: number;
};
indexes: {
clock: number;
key: string;
data: Uint8Array;
mime: string;
size: number;
createdAt: Date;
deletedAt: Date | null;
};
};
}
Expand Down Expand Up @@ -110,6 +115,11 @@ const init: Migrate = db => {
});

clocks.createIndex('timestamp', 'timestamp', { unique: false });

db.createObjectStore('blobs', {
keyPath: 'key',
autoIncrement: false,
});
};
// END REGION

Expand Down
37 changes: 37 additions & 0 deletions packages/common/doc-storage/src/impls/sqlite/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { DocStorage as NativeDocStorage } from '@affine/native';

import {
type Blob,
BlobStorage,
type BlobStorageOptions,
type ListedBlob,
} from '../../storage';

interface SqliteBlobStorageOptions extends BlobStorageOptions {
db: NativeDocStorage;
}

export class SqliteBlobStorage extends BlobStorage<SqliteBlobStorageOptions> {
get db() {
return this.options.db;
}

override getBlob(key: string): Promise<Blob | null> {
return this.db.getBlob(key);
}
override setBlob(blob: Blob): Promise<void> {
return this.db.setBlob({
...blob,
data: Buffer.from(blob.data),
});
}
override deleteBlob(key: string, permanently: boolean): Promise<void> {
return this.db.deleteBlob(key, permanently);
}
override releaseBlobs(): Promise<void> {
return this.db.releaseBlobs();
}
override listBlobs(): Promise<ListedBlob[]> {
return this.db.listBlobs();
}
}
42 changes: 42 additions & 0 deletions packages/common/doc-storage/src/storage/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { Connection } from './connection';

export interface BlobStorageOptions {
spaceType: string;
spaceId: string;
}

export interface Blob {
key: string;
data: Uint8Array;
mime: string;
}

export interface ListedBlob {
key: string;
size: number;
}

export abstract class BlobStorage<
Options extends BlobStorageOptions = BlobStorageOptions,
> extends Connection {
public readonly options: Options;

constructor(opts: Options) {
super();
this.options = opts;
}

get spaceType() {
return this.options.spaceType;
}

get spaceId() {
return this.options.spaceId;
}

abstract getBlob(key: string): Promise<Blob | null>;
abstract setBlob(blob: Blob): Promise<void>;
abstract deleteBlob(key: string, permanently: boolean): Promise<void>;
abstract releaseBlobs(): Promise<void>;
abstract listBlobs(/* pagination? */): Promise<ListedBlob[]>;
}
1 change: 1 addition & 0 deletions packages/common/doc-storage/src/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './blob';
export * from './doc';
26 changes: 21 additions & 5 deletions packages/frontend/native/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ export declare class DocStorage {
connect(): Promise<void>
close(): Promise<void>
get isClosed(): Promise<boolean>
/**
* Flush the WAL file to the database file.
* See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B
*/
checkpoint(): Promise<void>
pushUpdates(docId: string, updates: Array<Uint8Array>): Promise<number>
getDocSnapshot(docId: string): Promise<DocRecord | null>
setDocSnapshot(snapshot: DocRecord): Promise<boolean>
getDocUpdates(docId: string): Promise<Array<DocUpdate>>
markUpdatesMerged(docId: string, updates: Array<Date>): Promise<number>
deleteDoc(docId: string): Promise<void>
getDocClocks(after?: number | undefined | null): Promise<Array<DocClock>>
/**
* Flush the WAL file to the database file.
* See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B
*/
checkpoint(): Promise<void>
getBlob(key: string): Promise<Blob | null>
setBlob(blob: Blob): Promise<void>
deleteBlob(key: string, permanently: boolean): Promise<void>
releaseBlobs(): Promise<void>
listBlobs(): Promise<Array<ListedBlob>>
}

export declare class SqliteConnection {
Expand Down Expand Up @@ -56,6 +61,12 @@ export declare class SqliteConnection {
checkpoint(): Promise<void>
}

export interface Blob {
key: string
data: Buffer
mime: string
}

export interface BlobRow {
key: string
data: Buffer
Expand Down Expand Up @@ -84,6 +95,11 @@ export interface InsertRow {
data: Uint8Array
}

export interface ListedBlob {
key: string
size: number
}

export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise<string>

export interface UpdateRow {
Expand Down
11 changes: 10 additions & 1 deletion packages/frontend/native/migrations/20240929082254_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,13 @@ CREATE INDEX updates_doc_id ON v2_updates (doc_id);
CREATE TABLE "v2_clocks" (
doc_id TEXT PRIMARY KEY NOT NULL,
timestamp TIMESTAMP NOT NULL
)
);

CREATE TABLE "v2_blobs" (
key VARCHAR PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
mime VARCHAR NOT NULL,
size INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
deleted_at TIMESTAMP
);
54 changes: 48 additions & 6 deletions packages/frontend/native/src/sqlite/doc_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ pub struct DocClock {
pub timestamp: NaiveDateTime,
}

#[napi(object)]
pub struct Blob {
pub key: String,
pub data: Buffer,
pub mime: String,
}

#[napi(object)]
pub struct ListedBlob {
pub key: String,
pub size: i64,
}

#[napi]
pub struct DocStorage {
storage: storage::SqliteDocStorage,
Expand Down Expand Up @@ -59,6 +72,15 @@ impl DocStorage {
Ok(self.storage.is_closed())
}

/**
* Flush the WAL file to the database file.
* See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B
*/
#[napi]
pub async fn checkpoint(&self) -> napi::Result<()> {
self.storage.checkpoint().await.map_err(map_err)
}

#[napi]
pub async fn push_updates(&self, doc_id: String, updates: Vec<Uint8Array>) -> napi::Result<u32> {
let updates = updates.iter().map(|u| u.as_ref()).collect::<Vec<_>>();
Expand Down Expand Up @@ -111,12 +133,32 @@ impl DocStorage {
self.storage.get_doc_clocks(after).await.map_err(map_err)
}

/**
* Flush the WAL file to the database file.
* See https://www.sqlite.org/pragma.html#pragma_wal_checkpoint:~:text=PRAGMA%20schema.wal_checkpoint%3B
*/
#[napi]
pub async fn checkpoint(&self) -> napi::Result<()> {
self.storage.checkpoint().await.map_err(map_err)
pub async fn get_blob(&self, key: String) -> napi::Result<Option<Blob>> {
self.storage.get_blob(key).await.map_err(map_err)
}

#[napi]
pub async fn set_blob(&self, blob: Blob) -> napi::Result<()> {
self.storage.set_blob(blob).await.map_err(map_err)
}

#[napi]
pub async fn delete_blob(&self, key: String, permanently: bool) -> napi::Result<()> {
self
.storage
.delete_blob(key, permanently)
.await
.map_err(map_err)
}

#[napi]
pub async fn release_blobs(&self) -> napi::Result<()> {
self.storage.release_blobs().await.map_err(map_err)
}

#[napi]
pub async fn list_blobs(&self) -> napi::Result<Vec<ListedBlob>> {
self.storage.list_blobs().await.map_err(map_err)
}
}
Loading

0 comments on commit 5355e89

Please sign in to comment.