Skip to content

Commit

Permalink
feat(storage): support soft delete blobs (#561)
Browse files Browse the repository at this point in the history
* feat(storage): support soft delete blobs

* fix: lint & delete test case

---------

Co-authored-by: DarkSky <[email protected]>
  • Loading branch information
forehalo and darkskygit authored Nov 25, 2023
1 parent 4824984 commit 49a6b7a
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 28 deletions.
4 changes: 2 additions & 2 deletions libs/jwst-codec-utils/src/doc_operation/yrs_op/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub fn yrs_create_nest_type_from_root(doc: &yrs::Doc, target_type: CRDTNestType,
}
}

pub fn gen_nest_type_from_root(doc: &mut Doc, crdt_param: &CRDTParam) -> Option<YrsNestType> {
pub fn gen_nest_type_from_root(doc: &Doc, crdt_param: &CRDTParam) -> Option<YrsNestType> {
match crdt_param.new_nest_type {
CRDTNestType::Array => Some(yrs_create_nest_type_from_root(
doc,
Expand Down Expand Up @@ -124,7 +124,7 @@ pub fn gen_nest_type_from_root(doc: &mut Doc, crdt_param: &CRDTParam) -> Option<
}

pub fn gen_nest_type_from_nest_type(
doc: &mut Doc,
doc: &Doc,
crdt_param: CRDTParam,
nest_type: &mut YrsNestType,
) -> Option<YrsNestType> {
Expand Down
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand All @@ -13,6 +13,7 @@ pub struct Model {
pub blob: Vec<u8>,
pub length: i64,
pub created_at: DateTimeWithTimeZone,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/bucket_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/diff_log.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/docs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
pub mod prelude;

Expand Down
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/optimized_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand All @@ -15,6 +15,7 @@ pub struct Model {
pub created_at: DateTimeWithTimeZone,
#[sea_orm(primary_key, auto_increment = false)]
pub params: String,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
11 changes: 6 additions & 5 deletions libs/jwst-storage/src/entities/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
pub use super::{
blobs::Entity as Blobs, bucket_blobs::Entity as BucketBlobs, diff_log::Entity as DiffLog, docs::Entity as Docs,
optimized_blobs::Entity as OptimizedBlobs,
};
pub use super::blobs::Entity as Blobs;
pub use super::bucket_blobs::Entity as BucketBlobs;
pub use super::diff_log::Entity as DiffLog;
pub use super::docs::Entity as Docs;
pub use super::optimized_blobs::Entity as OptimizedBlobs;
2 changes: 2 additions & 0 deletions libs/jwst-storage/src/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod m20230321_000001_blob_optimized_table;
mod m20230614_000001_initial_bucket_blob_table;
mod m20230626_023319_doc_guid;
mod m20230814_061223_initial_diff_log_table;
mod m20231124_082401_blob_deleted_at;
mod schemas;

pub struct Migrator;
Expand All @@ -20,6 +21,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230614_000001_initial_bucket_blob_table::Migration),
Box::new(m20230626_023319_doc_guid::Migration),
Box::new(m20230814_061223_initial_diff_log_table::Migration),
Box::new(m20231124_082401_blob_deleted_at::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

use crate::schemas::{Blobs, OptimizedBlobs};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.add_column(ColumnDef::new(Blobs::DeletedAt).timestamp_with_time_zone().null())
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.add_column(
ColumnDef::new(OptimizedBlobs::DeletedAt)
.timestamp_with_time_zone()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.drop_column(Blobs::DeletedAt)
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.drop_column(OptimizedBlobs::DeletedAt)
.to_owned(),
)
.await
}
}
4 changes: 4 additions & 0 deletions libs/jwst-storage/src/migration/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Blobs {
Length,
#[iden = "created_at"]
Timestamp,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand All @@ -36,6 +38,8 @@ pub enum OptimizedBlobs {
#[iden = "created_at"]
Timestamp,
Params,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand Down
34 changes: 23 additions & 11 deletions libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@ impl BlobAutoStorage {

async fn exists(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<bool> {
Ok(OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)?)
}

async fn insert(&self, table: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> {
if !self.exists(table, hash, params).await? {
async fn insert(&self, workspace: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> {
if let Some(model) = OptimizedBlobs::find_by_id((workspace.into(), hash.into(), params.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: OptimizedBlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
OptimizedBlobs::insert(OptimizedBlobActiveModel {
workspace_id: Set(table.into()),
workspace_id: Set(workspace.into()),
hash: Set(hash.into()),
blob: Set(blob.into()),
length: Set(blob.len().try_into().unwrap()),
params: Set(params.into()),
created_at: Set(Utc::now().into()),
deleted_at: Set(None),
})
.exec(&self.pool)
.await?;
Expand All @@ -53,6 +64,7 @@ impl BlobAutoStorage {

async fn get(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<OptimizedBlobModel> {
OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
Expand Down Expand Up @@ -141,16 +153,16 @@ impl BlobAutoStorage {
}
}

async fn delete(&self, table: &str, hash: &str) -> JwstBlobResult<u64> {
Ok(OptimizedBlobs::delete_many()
.filter(
OptimizedBlobColumn::WorkspaceId
.eq(table)
.and(OptimizedBlobColumn::Hash.eq(hash)),
)
async fn delete(&self, workspace: &str, hash: &str) -> JwstBlobResult<u64> {
OptimizedBlobs::update_many()
.col_expr(OptimizedBlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(OptimizedBlobColumn::WorkspaceId.eq(workspace))
.filter(OptimizedBlobColumn::Hash.eq(hash))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.exec(&self.pool)
.await
.map(|r| r.rows_affected)?)
.map(|r| r.rows_affected)
.map_err(|e| e.into())
}

async fn drop(&self, table: &str) -> Result<(), DbErr> {
Expand Down
37 changes: 33 additions & 4 deletions libs/jwst-storage/src/storage/blobs/blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl BlobDBStorage {
async fn all(&self, workspace: &str) -> Result<Vec<BlobModel>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.all(&self.pool)
.await
}
Expand All @@ -48,6 +49,7 @@ impl BlobDBStorage {

Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column(BlobColumn::Hash)
.into_model::<BlobHash>()
Expand All @@ -60,12 +62,14 @@ impl BlobDBStorage {
async fn count(&self, workspace: &str) -> Result<u64, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
}

async fn exists(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)
Expand All @@ -76,6 +80,7 @@ impl BlobDBStorage {
.select_only()
.column_as(BlobColumn::Length, "size")
.column_as(BlobColumn::CreatedAt, "created_at")
.filter(BlobColumn::DeletedAt.is_null())
.into_model::<InternalBlobMetadata>()
.one(&self.pool)
.await
Expand All @@ -86,6 +91,7 @@ impl BlobDBStorage {
pub(super) async fn get_blobs_size(&self, workspaces: &[String]) -> Result<Option<i64>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.is_in(workspaces))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column_as(BlobColumn::Length.sum().cast_as(Alias::new("bigint")), "size")
.into_tuple::<Option<i64>>()
Expand All @@ -95,13 +101,23 @@ impl BlobDBStorage {
}

async fn insert(&self, workspace: &str, hash: &str, blob: &[u8]) -> Result<(), DbErr> {
if !self.exists(workspace, hash).await? {
if let Some(model) = Blobs::find_by_id((workspace.into(), hash.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: BlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
Blobs::insert(BlobActiveModel {
workspace_id: Set(workspace.into()),
hash: Set(hash.into()),
blob: Set(blob.into()),
length: Set(blob.len().try_into().unwrap()),
created_at: Set(Utc::now().into()),
deleted_at: Set(None),
})
.exec(&self.pool)
.await?;
Expand All @@ -112,14 +128,19 @@ impl BlobDBStorage {

pub(super) async fn get(&self, workspace: &str, hash: &str) -> JwstBlobResult<BlobModel> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
.and_then(|r| r.ok_or(JwstBlobError::BlobNotFound(hash.into())))
}

async fn delete(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::delete_by_id((workspace.into(), hash.into()))
Blobs::update_many()
.col_expr(BlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::Hash.eq(hash))
.filter(BlobColumn::DeletedAt.is_null())
.exec(&self.pool)
.await
.map(|r| r.rows_affected == 1)
Expand Down Expand Up @@ -262,7 +283,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {
hash: "test".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -282,7 +304,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {
hash: "test1".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -295,5 +318,11 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {

pool.drop("basic").await?;

pool.insert("basic", "test1", &[1, 2, 3, 4]).await?;
pool.delete("basic", "test1").await?;

assert_eq!(pool.count("basic").await?, 0);
pool.drop("basic").await?;

Ok(())
}

1 comment on commit 49a6b7a

@vercel
Copy link

@vercel vercel bot commented on 49a6b7a Nov 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.