From 49a6b7af25ce1fe54e8383e10980e9536821d286 Mon Sep 17 00:00:00 2001 From: liuyi Date: Sat, 25 Nov 2023 17:35:24 +0800 Subject: [PATCH] feat(storage): support soft delete blobs (#561) * feat(storage): support soft delete blobs * fix: lint & delete test case --------- Co-authored-by: DarkSky --- .../src/doc_operation/yrs_op/mod.rs | 4 +- libs/jwst-storage/src/entities/blobs.rs | 3 +- .../jwst-storage/src/entities/bucket_blobs.rs | 2 +- libs/jwst-storage/src/entities/diff_log.rs | 2 +- libs/jwst-storage/src/entities/docs.rs | 2 +- libs/jwst-storage/src/entities/mod.rs | 2 +- .../src/entities/optimized_blobs.rs | 3 +- libs/jwst-storage/src/entities/prelude.rs | 11 ++-- libs/jwst-storage/src/migration/src/lib.rs | 2 + .../src/m20231124_082401_blob_deleted_at.rs | 53 +++++++++++++++++++ .../jwst-storage/src/migration/src/schemas.rs | 4 ++ .../blobs/auto_storage/auto_storage.rs | 34 ++++++++---- .../src/storage/blobs/blob_storage.rs | 37 +++++++++++-- 13 files changed, 131 insertions(+), 28 deletions(-) create mode 100644 libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs diff --git a/libs/jwst-codec-utils/src/doc_operation/yrs_op/mod.rs b/libs/jwst-codec-utils/src/doc_operation/yrs_op/mod.rs index 749ea96f..dbb0c861 100644 --- a/libs/jwst-codec-utils/src/doc_operation/yrs_op/mod.rs +++ b/libs/jwst-codec-utils/src/doc_operation/yrs_op/mod.rs @@ -88,7 +88,7 @@ pub fn yrs_create_nest_type_from_root(doc: &yrs::Doc, target_type: CRDTNestType, } } -pub fn gen_nest_type_from_root(doc: &mut Doc, crdt_param: &CRDTParam) -> Option { +pub fn gen_nest_type_from_root(doc: &Doc, crdt_param: &CRDTParam) -> Option { match crdt_param.new_nest_type { CRDTNestType::Array => Some(yrs_create_nest_type_from_root( doc, @@ -124,7 +124,7 @@ pub fn gen_nest_type_from_root(doc: &mut Doc, crdt_param: &CRDTParam) -> Option< } pub fn gen_nest_type_from_nest_type( - doc: &mut Doc, + doc: &Doc, crdt_param: CRDTParam, nest_type: &mut YrsNestType, ) -> Option { diff --git a/libs/jwst-storage/src/entities/blobs.rs b/libs/jwst-storage/src/entities/blobs.rs index 4470efff..75594149 100644 --- a/libs/jwst-storage/src/entities/blobs.rs +++ b/libs/jwst-storage/src/entities/blobs.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 use sea_orm::entity::prelude::*; @@ -13,6 +13,7 @@ pub struct Model { pub blob: Vec, pub length: i64, pub created_at: DateTimeWithTimeZone, + pub deleted_at: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/libs/jwst-storage/src/entities/bucket_blobs.rs b/libs/jwst-storage/src/entities/bucket_blobs.rs index 8b92b05e..1bb541aa 100644 --- a/libs/jwst-storage/src/entities/bucket_blobs.rs +++ b/libs/jwst-storage/src/entities/bucket_blobs.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 use sea_orm::entity::prelude::*; diff --git a/libs/jwst-storage/src/entities/diff_log.rs b/libs/jwst-storage/src/entities/diff_log.rs index f3e56c37..a105c75b 100644 --- a/libs/jwst-storage/src/entities/diff_log.rs +++ b/libs/jwst-storage/src/entities/diff_log.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 use sea_orm::entity::prelude::*; diff --git a/libs/jwst-storage/src/entities/docs.rs b/libs/jwst-storage/src/entities/docs.rs index 2492bd5b..0075209b 100644 --- a/libs/jwst-storage/src/entities/docs.rs +++ b/libs/jwst-storage/src/entities/docs.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 use sea_orm::entity::prelude::*; diff --git a/libs/jwst-storage/src/entities/mod.rs b/libs/jwst-storage/src/entities/mod.rs index 2425a9d7..6797fce6 100644 --- a/libs/jwst-storage/src/entities/mod.rs +++ b/libs/jwst-storage/src/entities/mod.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 pub mod prelude; diff --git a/libs/jwst-storage/src/entities/optimized_blobs.rs b/libs/jwst-storage/src/entities/optimized_blobs.rs index a1b58188..1735caf8 100644 --- a/libs/jwst-storage/src/entities/optimized_blobs.rs +++ b/libs/jwst-storage/src/entities/optimized_blobs.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 use sea_orm::entity::prelude::*; @@ -15,6 +15,7 @@ pub struct Model { pub created_at: DateTimeWithTimeZone, #[sea_orm(primary_key, auto_increment = false)] pub params: String, + pub deleted_at: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/libs/jwst-storage/src/entities/prelude.rs b/libs/jwst-storage/src/entities/prelude.rs index aaabca6a..755a6f4d 100644 --- a/libs/jwst-storage/src/entities/prelude.rs +++ b/libs/jwst-storage/src/entities/prelude.rs @@ -1,6 +1,7 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6 -pub use super::{ - blobs::Entity as Blobs, bucket_blobs::Entity as BucketBlobs, diff_log::Entity as DiffLog, docs::Entity as Docs, - optimized_blobs::Entity as OptimizedBlobs, -}; +pub use super::blobs::Entity as Blobs; +pub use super::bucket_blobs::Entity as BucketBlobs; +pub use super::diff_log::Entity as DiffLog; +pub use super::docs::Entity as Docs; +pub use super::optimized_blobs::Entity as OptimizedBlobs; diff --git a/libs/jwst-storage/src/migration/src/lib.rs b/libs/jwst-storage/src/migration/src/lib.rs index 8861c8ec..71ea3200 100644 --- a/libs/jwst-storage/src/migration/src/lib.rs +++ b/libs/jwst-storage/src/migration/src/lib.rs @@ -6,6 +6,7 @@ mod m20230321_000001_blob_optimized_table; mod m20230614_000001_initial_bucket_blob_table; mod m20230626_023319_doc_guid; mod m20230814_061223_initial_diff_log_table; +mod m20231124_082401_blob_deleted_at; mod schemas; pub struct Migrator; @@ -20,6 +21,7 @@ impl MigratorTrait for Migrator { Box::new(m20230614_000001_initial_bucket_blob_table::Migration), Box::new(m20230626_023319_doc_guid::Migration), Box::new(m20230814_061223_initial_diff_log_table::Migration), + Box::new(m20231124_082401_blob_deleted_at::Migration), ] } } diff --git a/libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs b/libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs new file mode 100644 index 00000000..d784a487 --- /dev/null +++ b/libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs @@ -0,0 +1,53 @@ +use sea_orm_migration::prelude::*; + +use crate::schemas::{Blobs, OptimizedBlobs}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Blobs::Table) + .add_column(ColumnDef::new(Blobs::DeletedAt).timestamp_with_time_zone().null()) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(OptimizedBlobs::Table) + .add_column( + ColumnDef::new(OptimizedBlobs::DeletedAt) + .timestamp_with_time_zone() + .null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Blobs::Table) + .drop_column(Blobs::DeletedAt) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(OptimizedBlobs::Table) + .drop_column(OptimizedBlobs::DeletedAt) + .to_owned(), + ) + .await + } +} diff --git a/libs/jwst-storage/src/migration/src/schemas.rs b/libs/jwst-storage/src/migration/src/schemas.rs index b3d6eeb3..ee4cce29 100644 --- a/libs/jwst-storage/src/migration/src/schemas.rs +++ b/libs/jwst-storage/src/migration/src/schemas.rs @@ -10,6 +10,8 @@ pub enum Blobs { Length, #[iden = "created_at"] Timestamp, + #[iden = "deleted_at"] + DeletedAt, } #[derive(Iden)] @@ -36,6 +38,8 @@ pub enum OptimizedBlobs { #[iden = "created_at"] Timestamp, Params, + #[iden = "deleted_at"] + DeletedAt, } #[derive(Iden)] diff --git a/libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs b/libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs index 7278226c..e522388c 100644 --- a/libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs +++ b/libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs @@ -29,20 +29,31 @@ impl BlobAutoStorage { async fn exists(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult { Ok(OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into())) + .filter(OptimizedBlobColumn::DeletedAt.is_null()) .count(&self.pool) .await .map(|c| c > 0)?) } - async fn insert(&self, table: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> { - if !self.exists(table, hash, params).await? { + async fn insert(&self, workspace: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> { + if let Some(model) = OptimizedBlobs::find_by_id((workspace.into(), hash.into(), params.into())) + .one(&self.pool) + .await? + { + if model.deleted_at.is_some() { + let mut model: OptimizedBlobActiveModel = model.into(); + model.deleted_at = Set(None); + model.update(&self.pool).await?; + } + } else { OptimizedBlobs::insert(OptimizedBlobActiveModel { - workspace_id: Set(table.into()), + workspace_id: Set(workspace.into()), hash: Set(hash.into()), blob: Set(blob.into()), length: Set(blob.len().try_into().unwrap()), params: Set(params.into()), created_at: Set(Utc::now().into()), + deleted_at: Set(None), }) .exec(&self.pool) .await?; @@ -53,6 +64,7 @@ impl BlobAutoStorage { async fn get(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult { OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into())) + .filter(OptimizedBlobColumn::DeletedAt.is_null()) .one(&self.pool) .await .map_err(|e| e.into()) @@ -141,16 +153,16 @@ impl BlobAutoStorage { } } - async fn delete(&self, table: &str, hash: &str) -> JwstBlobResult { - Ok(OptimizedBlobs::delete_many() - .filter( - OptimizedBlobColumn::WorkspaceId - .eq(table) - .and(OptimizedBlobColumn::Hash.eq(hash)), - ) + async fn delete(&self, workspace: &str, hash: &str) -> JwstBlobResult { + OptimizedBlobs::update_many() + .col_expr(OptimizedBlobColumn::DeletedAt, Expr::value(Utc::now())) + .filter(OptimizedBlobColumn::WorkspaceId.eq(workspace)) + .filter(OptimizedBlobColumn::Hash.eq(hash)) + .filter(OptimizedBlobColumn::DeletedAt.is_null()) .exec(&self.pool) .await - .map(|r| r.rows_affected)?) + .map(|r| r.rows_affected) + .map_err(|e| e.into()) } async fn drop(&self, table: &str) -> Result<(), DbErr> { diff --git a/libs/jwst-storage/src/storage/blobs/blob_storage.rs b/libs/jwst-storage/src/storage/blobs/blob_storage.rs index 726157bb..84dbfbdb 100644 --- a/libs/jwst-storage/src/storage/blobs/blob_storage.rs +++ b/libs/jwst-storage/src/storage/blobs/blob_storage.rs @@ -36,6 +36,7 @@ impl BlobDBStorage { async fn all(&self, workspace: &str) -> Result, DbErr> { Blobs::find() .filter(BlobColumn::WorkspaceId.eq(workspace)) + .filter(BlobColumn::DeletedAt.is_null()) .all(&self.pool) .await } @@ -48,6 +49,7 @@ impl BlobDBStorage { Blobs::find() .filter(BlobColumn::WorkspaceId.eq(workspace)) + .filter(BlobColumn::DeletedAt.is_null()) .select_only() .column(BlobColumn::Hash) .into_model::() @@ -60,12 +62,14 @@ impl BlobDBStorage { async fn count(&self, workspace: &str) -> Result { Blobs::find() .filter(BlobColumn::WorkspaceId.eq(workspace)) + .filter(BlobColumn::DeletedAt.is_null()) .count(&self.pool) .await } async fn exists(&self, workspace: &str, hash: &str) -> Result { Blobs::find_by_id((workspace.into(), hash.into())) + .filter(BlobColumn::DeletedAt.is_null()) .count(&self.pool) .await .map(|c| c > 0) @@ -76,6 +80,7 @@ impl BlobDBStorage { .select_only() .column_as(BlobColumn::Length, "size") .column_as(BlobColumn::CreatedAt, "created_at") + .filter(BlobColumn::DeletedAt.is_null()) .into_model::() .one(&self.pool) .await @@ -86,6 +91,7 @@ impl BlobDBStorage { pub(super) async fn get_blobs_size(&self, workspaces: &[String]) -> Result, DbErr> { Blobs::find() .filter(BlobColumn::WorkspaceId.is_in(workspaces)) + .filter(BlobColumn::DeletedAt.is_null()) .select_only() .column_as(BlobColumn::Length.sum().cast_as(Alias::new("bigint")), "size") .into_tuple::>() @@ -95,13 +101,23 @@ impl BlobDBStorage { } async fn insert(&self, workspace: &str, hash: &str, blob: &[u8]) -> Result<(), DbErr> { - if !self.exists(workspace, hash).await? { + if let Some(model) = Blobs::find_by_id((workspace.into(), hash.into())) + .one(&self.pool) + .await? + { + if model.deleted_at.is_some() { + let mut model: BlobActiveModel = model.into(); + model.deleted_at = Set(None); + model.update(&self.pool).await?; + } + } else { Blobs::insert(BlobActiveModel { workspace_id: Set(workspace.into()), hash: Set(hash.into()), blob: Set(blob.into()), length: Set(blob.len().try_into().unwrap()), created_at: Set(Utc::now().into()), + deleted_at: Set(None), }) .exec(&self.pool) .await?; @@ -112,6 +128,7 @@ impl BlobDBStorage { pub(super) async fn get(&self, workspace: &str, hash: &str) -> JwstBlobResult { Blobs::find_by_id((workspace.into(), hash.into())) + .filter(BlobColumn::DeletedAt.is_null()) .one(&self.pool) .await .map_err(|e| e.into()) @@ -119,7 +136,11 @@ impl BlobDBStorage { } async fn delete(&self, workspace: &str, hash: &str) -> Result { - Blobs::delete_by_id((workspace.into(), hash.into())) + Blobs::update_many() + .col_expr(BlobColumn::DeletedAt, Expr::value(Utc::now())) + .filter(BlobColumn::WorkspaceId.eq(workspace)) + .filter(BlobColumn::Hash.eq(hash)) + .filter(BlobColumn::DeletedAt.is_null()) .exec(&self.pool) .await .map(|r| r.rows_affected == 1) @@ -262,7 +283,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> { hash: "test".into(), blob: vec![1, 2, 3, 4], length: 4, - created_at: all.get(0).unwrap().created_at + created_at: all.get(0).unwrap().created_at, + deleted_at: None, }] ); assert_eq!(pool.count("basic").await?, 1); @@ -282,7 +304,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> { hash: "test1".into(), blob: vec![1, 2, 3, 4], length: 4, - created_at: all.get(0).unwrap().created_at + created_at: all.get(0).unwrap().created_at, + deleted_at: None, }] ); assert_eq!(pool.count("basic").await?, 1); @@ -295,5 +318,11 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> { pool.drop("basic").await?; + pool.insert("basic", "test1", &[1, 2, 3, 4]).await?; + pool.delete("basic", "test1").await?; + + assert_eq!(pool.count("basic").await?, 0); + pool.drop("basic").await?; + Ok(()) }