Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support soft delete blobs #561

Merged
merged 2 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand All @@ -13,6 +13,7 @@ pub struct Model {
pub blob: Vec<u8>,
pub length: i64,
pub created_at: DateTimeWithTimeZone,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/bucket_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/diff_log.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/docs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

pub mod prelude;

Expand Down
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/optimized_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand All @@ -15,6 +15,7 @@ pub struct Model {
pub created_at: DateTimeWithTimeZone,
#[sea_orm(primary_key, auto_increment = false)]
pub params: String,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
11 changes: 6 additions & 5 deletions libs/jwst-storage/src/entities/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

pub use super::{
blobs::Entity as Blobs, bucket_blobs::Entity as BucketBlobs, diff_log::Entity as DiffLog, docs::Entity as Docs,
optimized_blobs::Entity as OptimizedBlobs,
};
pub use super::blobs::Entity as Blobs;
pub use super::bucket_blobs::Entity as BucketBlobs;
pub use super::diff_log::Entity as DiffLog;
pub use super::docs::Entity as Docs;
pub use super::optimized_blobs::Entity as OptimizedBlobs;
2 changes: 2 additions & 0 deletions libs/jwst-storage/src/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod m20230321_000001_blob_optimized_table;
mod m20230614_000001_initial_bucket_blob_table;
mod m20230626_023319_doc_guid;
mod m20230814_061223_initial_diff_log_table;
mod m20231124_082401_blob_deleted_at;
mod schemas;

pub struct Migrator;
Expand All @@ -20,6 +21,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230614_000001_initial_bucket_blob_table::Migration),
Box::new(m20230626_023319_doc_guid::Migration),
Box::new(m20230814_061223_initial_diff_log_table::Migration),
Box::new(m20231124_082401_blob_deleted_at::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

use crate::schemas::{Blobs, OptimizedBlobs};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.add_column(ColumnDef::new(Blobs::DeletedAt).timestamp_with_time_zone().null())
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.add_column(
ColumnDef::new(OptimizedBlobs::DeletedAt)
.timestamp_with_time_zone()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.drop_column(Blobs::DeletedAt)
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.drop_column(OptimizedBlobs::DeletedAt)
.to_owned(),
)
.await
}
}
4 changes: 4 additions & 0 deletions libs/jwst-storage/src/migration/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Blobs {
Length,
#[iden = "created_at"]
Timestamp,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand All @@ -36,6 +38,8 @@ pub enum OptimizedBlobs {
#[iden = "created_at"]
Timestamp,
Params,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand Down
25 changes: 17 additions & 8 deletions libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,23 @@ impl BlobAutoStorage {

async fn exists(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<bool> {
Ok(OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)?)
}

async fn insert(&self, table: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> {
if !self.exists(table, hash, params).await? {
if let Some(model) = OptimizedBlobs::find_by_id((workspace.into(), hash.into(), params.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: OptimizedBlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
OptimizedBlobs::insert(OptimizedBlobActiveModel {
workspace_id: Set(table.into()),
hash: Set(hash.into()),
Expand All @@ -53,6 +63,7 @@ impl BlobAutoStorage {

async fn get(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<OptimizedBlobModel> {
OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
Expand Down Expand Up @@ -142,15 +153,13 @@ impl BlobAutoStorage {
}

async fn delete(&self, table: &str, hash: &str) -> JwstBlobResult<u64> {
Ok(OptimizedBlobs::delete_many()
.filter(
OptimizedBlobColumn::WorkspaceId
.eq(table)
.and(OptimizedBlobColumn::Hash.eq(hash)),
)
OptimizedBlobs::update_many()
.col_expr(OptimizedBlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(OptimizedBlobColumn::WorkspaceId.eq(workspace))
.filter(OptimizedBlobColumn::Hash.eq(hash))
.exec(&self.pool)
.await
.map(|r| r.rows_affected)?)
.map(|r| r.rows_affected == 1)
}

async fn drop(&self, table: &str) -> Result<(), DbErr> {
Expand Down
35 changes: 31 additions & 4 deletions libs/jwst-storage/src/storage/blobs/blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl BlobDBStorage {
async fn all(&self, workspace: &str) -> Result<Vec<BlobModel>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.all(&self.pool)
.await
}
Expand All @@ -48,6 +49,7 @@ impl BlobDBStorage {

Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column(BlobColumn::Hash)
.into_model::<BlobHash>()
Expand All @@ -60,12 +62,14 @@ impl BlobDBStorage {
async fn count(&self, workspace: &str) -> Result<u64, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
}

async fn exists(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)
Expand All @@ -86,6 +90,7 @@ impl BlobDBStorage {
pub(super) async fn get_blobs_size(&self, workspaces: &[String]) -> Result<Option<i64>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.is_in(workspaces))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column_as(BlobColumn::Length.sum().cast_as(Alias::new("bigint")), "size")
.into_tuple::<Option<i64>>()
Expand All @@ -95,13 +100,23 @@ impl BlobDBStorage {
}

async fn insert(&self, workspace: &str, hash: &str, blob: &[u8]) -> Result<(), DbErr> {
if !self.exists(workspace, hash).await? {
if let Some(model) = Blobs::find_by_id((workspace.into(), hash.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: BlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
Blobs::insert(BlobActiveModel {
workspace_id: Set(workspace.into()),
hash: Set(hash.into()),
blob: Set(blob.into()),
length: Set(blob.len().try_into().unwrap()),
created_at: Set(Utc::now().into()),
deleted_at: Set(None),
})
.exec(&self.pool)
.await?;
Expand All @@ -112,14 +127,18 @@ impl BlobDBStorage {

pub(super) async fn get(&self, workspace: &str, hash: &str) -> JwstBlobResult<BlobModel> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
.and_then(|r| r.ok_or(JwstBlobError::BlobNotFound(hash.into())))
}

async fn delete(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::delete_by_id((workspace.into(), hash.into()))
Blobs::update_many()
.col_expr(BlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::Hash.eq(hash))
.exec(&self.pool)
.await
.map(|r| r.rows_affected == 1)
Expand Down Expand Up @@ -262,7 +281,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {
hash: "test".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -282,7 +302,8 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {
hash: "test1".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -295,5 +316,11 @@ pub async fn blobs_storage_test(pool: &BlobDBStorage) -> anyhow::Result<()> {

pool.drop("basic").await?;

pool.insert("basic", "test1", &[1, 2, 3, 4]).await?;
pool.delete("basic", "test1").await?;

assert_eq!(pool.count("basic").await?, 0);
pool.drop("basic").await?;

Ok(())
}
Loading