Skip to content

Commit

Permalink
Add initial support for already-moved pending blob.
Browse files Browse the repository at this point in the history
  • Loading branch information
emmiegit committed Oct 1, 2024
1 parent f43d861 commit 9621314
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 24 deletions.
95 changes: 75 additions & 20 deletions deepwell/src/services/blob/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,12 @@ impl BlobService {
ctx: &ServiceContext<'_>,
user_id: i64,
pending_blob_id: &str,
) -> Result<String> {
) -> Result<PendingBlob> {
let txn = ctx.transaction();
let row = BlobPending::find_by_id(pending_blob_id).one(txn).await?;
let BlobPendingModel {
s3_path,
s3_hash,
created_by,
..
} = match row {
Expand All @@ -149,7 +150,10 @@ impl BlobService {
return Err(Error::BlobWrongUser);
}

Ok(s3_path)
Ok(PendingBlob {
s3_path,
moved_hash: s3_hash,
})
}

pub async fn cancel_upload(
Expand All @@ -159,7 +163,9 @@ impl BlobService {
) -> Result<()> {
info!("Cancelling upload for blob for pending ID {pending_blob_id}");
let txn = ctx.transaction();
let s3_path = Self::get_pending_blob_path(ctx, user_id, pending_blob_id).await?;
let PendingBlob { s3_path, .. } =
Self::get_pending_blob_path(ctx, user_id, pending_blob_id).await?;

BlobPending::delete_by_id(pending_blob_id).exec(txn).await?;

if Self::head(ctx, &s3_path).await?.is_some() {
Expand All @@ -170,32 +176,33 @@ impl BlobService {
Ok(())
}

pub async fn finish_upload(
/// Helper function to do the actual "move" step of blob finalization.
/// This is where, after uploading to the presign URL, the S3 object is
/// then moved to its permanent location with a hashed name.
async fn move_uploaded(
ctx: &ServiceContext<'_>,
user_id: i64,
pending_blob_id: &str,
s3_path: &str,
) -> Result<FinalizeBlobUploadOutput> {
info!("Finishing upload for blob for pending ID {pending_blob_id}");
let bucket = ctx.s3_bucket();
let txn = ctx.transaction();

debug!("Getting pending blob info");
let s3_path = Self::get_pending_blob_path(ctx, user_id, pending_blob_id).await?;

debug!("Download uploaded blob from S3 uploads to get metadata");
let response = bucket.get_object(&s3_path).await?;
let response = bucket.get_object(s3_path).await?;
let data: Vec<u8> = match response.status_code() {
200 => response.into(),
404 => {
error!("No blob uploaded at presign path {s3_path}");
return Err(Error::BlobNotUploaded);
}
_ => {
error!("Cannot find blob at presign path {s3_path}");
BlobPending::delete_by_id(pending_blob_id).exec(txn).await?;
info!("Deleted pending blob due to missing presign object in S3");
return Err(Error::FileNotUploaded);
error!("Unable to retrieve uploaded blob at {s3_path} from S3");
let error = s3_error(&response, "finalizing uploaded blob")?;
return Err(error);
}
};

debug!("Deleting pending blob");
BlobPending::delete_by_id(pending_blob_id).exec(txn).await?;
// TODO compare actual data length to promised length

// Special handling for empty blobs
if data.is_empty() {
Expand Down Expand Up @@ -260,18 +267,60 @@ impl BlobService {
size,
created: true,
}),
_ => s3_error(&response, "creating final S3 blob"),
_ => s3_error(&response, "creating final S3 blob")?,
}
}
};

// Delete uploaded version, in either case
bucket.delete_object(&s3_path).await?;

// Return result based on blob status
// Update pending blob with hash
let model = blob_pending::ActiveModel {
external_id: Set(str!(pending_blob_id)),
s3_hash: Set(Some(hash.to_vec())),
..Default::default()
};
model.update(txn).await?;

// Return
result
}

pub async fn finish_upload(
ctx: &ServiceContext<'_>,
user_id: i64,
pending_blob_id: &str,
) -> Result<FinalizeBlobUploadOutput> {
info!("Finishing upload for blob for pending ID {pending_blob_id}");

let PendingBlob {
s3_path,
moved_hash,
} = Self::get_pending_blob_path(ctx, user_id, pending_blob_id).await?;

let output = match moved_hash {
// Need to move from pending to main hash area
None => Self::move_uploaded(ctx, pending_blob_id, &s3_path).await?,

// Already moved
Some(hash_vec) => {
let BlobMetadata { mime, size, .. } =
Self::get_metadata(ctx, &hash_vec).await?;
let mut hash = [0; 64];
hash.copy_from_slice(&hash_vec);

FinalizeBlobUploadOutput {
hash,
mime,
size,
created: false,
}
}
};

// Return result based on blob status
Ok(output)
}

pub async fn get_optional(
ctx: &ServiceContext<'_>,
hash: &[u8],
Expand Down Expand Up @@ -432,3 +481,9 @@ fn s3_error<T>(response: &ResponseData, action: &str) -> Result<T> {
// TODO replace with S3 backend-specific error
Err(Error::S3Response)
}

#[derive(Debug)]
struct PendingBlob {
s3_path: String,
moved_hash: Option<Vec<u8>>,
}
8 changes: 4 additions & 4 deletions deepwell/src/services/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ pub enum Error {
#[error("File revision does not exist")]
FileRevisionNotFound,

#[error("File not uploaded")]
FileNotUploaded, // occurs when presign URL is not uploaded to

#[error("Vote does not exist")]
VoteNotFound,

Expand All @@ -239,6 +236,9 @@ pub enum Error {
#[error("Blob item does not exist")]
BlobNotFound,

#[error("Blob not uploaded")]
BlobNotUploaded,

#[error("Cannot use blob uploaded by different user")]
BlobWrongUser,

Expand Down Expand Up @@ -324,7 +324,6 @@ impl Error {
Error::MessageDraftNotFound => 2015,
Error::BlobNotFound => 2016,
Error::TextNotFound => 2017,
Error::FileNotUploaded => 2018,

// 2100 -- Existing data
Error::UserExists => 2100,
Expand Down Expand Up @@ -384,6 +383,7 @@ impl Error {
Error::MessageTooManyRecipients => 4021,
Error::BlobWrongUser => 4022,
Error::BlobTooBig => 4023,
Error::BlobNotUploaded => 4024,

// 4100 -- Localization
Error::LocaleInvalid(_) => 4100,
Expand Down

0 comments on commit 9621314

Please sign in to comment.