Skip to content

Commit

Permalink
feat(storage): support soft delete blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Nov 24, 2023
1 parent 4824984 commit a1a5c68
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 23 deletions.
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand All @@ -13,6 +13,7 @@ pub struct Model {
pub blob: Vec<u8>,
pub length: i64,
pub created_at: DateTimeWithTimeZone,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/bucket_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/diff_log.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/docs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
pub mod prelude;

Expand Down
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/optimized_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
use sea_orm::entity::prelude::*;

Expand All @@ -15,6 +15,7 @@ pub struct Model {
pub created_at: DateTimeWithTimeZone,
#[sea_orm(primary_key, auto_increment = false)]
pub params: String,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
11 changes: 6 additions & 5 deletions libs/jwst-storage/src/entities/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6
pub use super::{
blobs::Entity as Blobs, bucket_blobs::Entity as BucketBlobs, diff_log::Entity as DiffLog, docs::Entity as Docs,
optimized_blobs::Entity as OptimizedBlobs,
};
pub use super::blobs::Entity as Blobs;
pub use super::bucket_blobs::Entity as BucketBlobs;
pub use super::diff_log::Entity as DiffLog;
pub use super::docs::Entity as Docs;
pub use super::optimized_blobs::Entity as OptimizedBlobs;
2 changes: 2 additions & 0 deletions libs/jwst-storage/src/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod m20230321_000001_blob_optimized_table;
mod m20230614_000001_initial_bucket_blob_table;
mod m20230626_023319_doc_guid;
mod m20230814_061223_initial_diff_log_table;
mod m20231124_082401_blob_deleted_at;
mod schemas;

pub struct Migrator;
Expand All @@ -20,6 +21,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230614_000001_initial_bucket_blob_table::Migration),
Box::new(m20230626_023319_doc_guid::Migration),
Box::new(m20230814_061223_initial_diff_log_table::Migration),
Box::new(m20231124_082401_blob_deleted_at::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

use crate::schemas::{Blobs, OptimizedBlobs};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.add_column(ColumnDef::new(Blobs::DeletedAt).timestamp_with_time_zone().null())
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.add_column(
ColumnDef::new(OptimizedBlobs::DeletedAt)
.timestamp_with_time_zone()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.drop_column(Blobs::DeletedAt)
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.drop_column(OptimizedBlobs::DeletedAt)
.to_owned(),
)
.await
}
}
4 changes: 4 additions & 0 deletions libs/jwst-storage/src/migration/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Blobs {
Length,
#[iden = "created_at"]
Timestamp,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand All @@ -36,6 +38,8 @@ pub enum OptimizedBlobs {
#[iden = "created_at"]
Timestamp,
Params,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand Down
25 changes: 17 additions & 8 deletions libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,23 @@ impl BlobAutoStorage {

async fn exists(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<bool> {
Ok(OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)?)
}

async fn insert(&self, table: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> {
if !self.exists(table, hash, params).await? {
if let Some(model) = OptimizedBlobs::find_by_id((workspace.into(), hash.into(), params.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: OptimizedBlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
OptimizedBlobs::insert(OptimizedBlobActiveModel {
workspace_id: Set(table.into()),
hash: Set(hash.into()),
Expand All @@ -53,6 +63,7 @@ impl BlobAutoStorage {

async fn get(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<OptimizedBlobModel> {
OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
Expand Down Expand Up @@ -142,15 +153,13 @@ impl BlobAutoStorage {
}

async fn delete(&self, table: &str, hash: &str) -> JwstBlobResult<u64> {
Ok(OptimizedBlobs::delete_many()
.filter(
OptimizedBlobColumn::WorkspaceId
.eq(table)
.and(OptimizedBlobColumn::Hash.eq(hash)),
)
OptimizedBlobs::update_many()
.col_expr(OptimizedBlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(OptimizedBlobColumn::WorkspaceId.eq(workspace))
.filter(OptimizedBlobColumn::Hash.eq(hash))
.exec(&self.pool)
.await
.map(|r| r.rows_affected)?)
.map(|r| r.rows_affected == 1)
}

async fn drop(&self, table: &str) -> Result<(), DbErr> {
Expand Down
35 changes: 31 additions & 4 deletions libs/jwst-storage/src/storage/blobs/blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl BlobDBStorage {
async fn all(&self, workspace: &str) -> Result<Vec<BlobModel>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.all(&self.pool)
.await
}
Expand All @@ -48,6 +49,7 @@ impl BlobDBStorage {

Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column(BlobColumn::Hash)
.into_model::<BlobHash>()
Expand All @@ -60,12 +62,14 @@ impl BlobDBStorage {
async fn count(&self, workspace: &str) -> Result<u64, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
}

async fn exists(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)
Expand All @@ -86,6 +90,7 @@ impl BlobDBStorage {
pub(super) async fn get_blobs_size(&self, workspaces: &[String]) -> Result<Option<i64>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.is_in(workspaces))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column_as(BlobColumn::Length.sum().cast_as(Alias::new("bigint")), "size")
.into_tuple::<Option<i64>>()
Expand All @@ -95,13 +100,23 @@ impl BlobDBStorage {
}

async fn insert(&self, workspace: &str, hash: &str, blob: &[u8]) -> Result<(), DbErr> {
if !self.exists(workspace, hash).await? {
if let Some(model) = Blobs::find_by_id((workspace.into(), hash.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: BlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
Blobs::insert(BlobActiveModel {
workspace_id: Set(workspace.into()),
hash: Set(hash.into()),
blob: Set(blob.into()),
length: Set(blob.len().try_into().unwrap()),
created_at: Set(Utc::now().into()),
deleted_at: Set(None),
})
.exec(&self.pool)
.await?;
Expand All @@ -112,14 +127,18 @@ impl BlobDBStorage {

pub(super) async fn get(&self, workspace: &str, hash: &str) -> JwstBlobResult<BlobModel> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
.and_then(|r| r.ok_or(JwstBlobError::BlobNotFound(hash.into())))
}

async fn delete(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::delete_by_id((workspace.into(), hash.into()))
Blobs::update_many()
.col_expr(BlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::Hash.eq(hash))
.exec(&self.pool)
.await
.map(|r| r.rows_affected == 1)
Expand Down Expand Up @@ -262,7 +281,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {
hash: "test".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -282,7 +302,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {
hash: "test1".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -295,5 +316,11 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {

pool.drop("basic").await?;

pool.insert("basic", "test1", &[1, 2, 3, 4]).await?;
pool.delete("basic", "test1").await?;

assert_eq!(pool.count("basic").await?, 0);
pool.drop("basic").await?;

Ok(())
}

0 comments on commit a1a5c68

Please sign in to comment.