From 92e5320d40db824183d6ad38c8b2ac6dc3711074 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 9 Oct 2024 20:34:37 +0800 Subject: [PATCH] fix: end of file when unzip file --- Cargo.lock | 15 ++- Cargo.toml | 14 +- .../src/database/database_collab.rs | 4 +- .../src/document/getting_started.rs | 23 +--- libs/workspace-template/src/lib.rs | 14 +- .../src/tests/getting_started_tests.rs | 9 +- services/appflowy-worker/Cargo.toml | 1 + services/appflowy-worker/src/application.rs | 2 +- .../src/import_worker/unzip.rs | 55 ++++++-- .../src/import_worker/worker.rs | 127 ++++++++++++------ services/appflowy-worker/src/s3_client.rs | 7 +- services/appflowy-worker/tests/import_test.rs | 2 +- 12 files changed, 169 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83fff1ee5..3176237c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2230,7 +2230,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "arc-swap", @@ -2255,7 +2255,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "async-trait", @@ -2294,7 +2294,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "arc-swap", @@ -2315,7 +2315,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "bytes", @@ -2335,7 +2335,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "arc-swap", @@ -2357,7 +2357,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "async-recursion", @@ -2371,6 +2371,7 @@ dependencies = [ "collab-folder", "fancy-regex 0.13.0", "futures", + "futures-util", "fxhash", "hex", "markdown", @@ -2454,7 +2455,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=907ca13edd948effde6a4cbd0f5c25e16519b3cb#907ca13edd948effde6a4cbd0f5c25e16519b3cb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index c150483f7..de5fa657d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -293,13 +293,13 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "907ca13edd948effde6a4cbd0f5c25e16519b3cb" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } [features] history = [] diff --git a/libs/workspace-template/src/database/database_collab.rs b/libs/workspace-template/src/database/database_collab.rs index 28d0b1449..214e0425d 100644 --- a/libs/workspace-template/src/database/database_collab.rs +++ b/libs/workspace-template/src/database/database_collab.rs @@ -12,8 +12,8 @@ pub async fn create_database_collab( collab_service, notifier: Default::default(), }; - Database::create_with_view(params, context) - .await? + let database = Database::create_with_view(params, context).await?; + database .encode_database_collabs() .await .map_err(|e| anyhow::anyhow!("Failed to encode database collabs: {:?}", e)) diff --git a/libs/workspace-template/src/document/getting_started.rs b/libs/workspace-template/src/document/getting_started.rs index 23f0c3b9b..f175530df 100644 --- a/libs/workspace-template/src/document/getting_started.rs +++ b/libs/workspace-template/src/document/getting_started.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; -use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; use collab::core::origin::CollabOrigin; -use collab::lock::RwLock; + use collab::preclude::Collab; use collab_database::database::{timestamp, DatabaseData}; use collab_database::entity::CreateDatabaseParams; @@ -66,12 +65,7 @@ impl GettingStartedTemplate { create_database_params: CreateDatabaseParams, ) -> anyhow::Result> { let database_id = create_database_params.database_id.clone(); - let encoded_database = tokio::task::spawn_blocking({ - let create_database_params = create_database_params.clone(); - move || create_database_collab(create_database_params) - }) - .await? - .await?; + let encoded_database = create_database_collab(create_database_params).await?; let encoded_database_collab = encoded_database .encoded_database_collab @@ -236,7 +230,7 @@ impl WorkspaceTemplate for GettingStartedTemplate { async fn create_workspace_view( &self, _uid: i64, - workspace_view_builder: Arc>, + workspace_view_builder: &mut WorkspaceViewBuilder, ) -> anyhow::Result> { let general_view_uuid = gen_view_id().to_string(); let shared_view_uuid = gen_view_id().to_string(); @@ -263,12 +257,10 @@ impl WorkspaceTemplate for GettingStartedTemplate { ) .await?; - let mut builder = workspace_view_builder.write().await; - // Create general space with 2 built-in views: Getting started, To-dos // The Getting started view is a document view, and the To-dos view is a board view // The Getting started view contains 2 sub views: Desktop guide, Mobile guide - builder + workspace_view_builder .with_view_builder(|view_builder| async { let created_at = timestamp(); let mut view_builder = view_builder @@ -305,7 +297,7 @@ impl WorkspaceTemplate for GettingStartedTemplate { .await; // Create shared space without any built-in views - builder + workspace_view_builder .with_view_builder(|view_builder| async { let created_at = timestamp(); let view_builder = view_builder @@ -371,12 +363,11 @@ impl WorkspaceTemplate for DocumentTemplate { async fn create_workspace_view( &self, _uid: i64, - workspace_view_builder: Arc>, + workspace_view_builder: &mut WorkspaceViewBuilder, ) -> anyhow::Result> { let view_id = gen_view_id().to_string(); - let mut builder = workspace_view_builder.write().await; - builder + workspace_view_builder .with_view_builder(|view_builder| async { view_builder .with_name("Getting started") diff --git a/libs/workspace-template/src/lib.rs b/libs/workspace-template/src/lib.rs index 30c3a69f1..c85b94cf8 100644 --- a/libs/workspace-template/src/lib.rs +++ b/libs/workspace-template/src/lib.rs @@ -5,7 +5,7 @@ pub use anyhow::Result; use async_trait::async_trait; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; -use collab::lock::RwLock; + use collab::preclude::Collab; use collab_entity::CollabType; use collab_folder::{ @@ -30,7 +30,7 @@ pub trait WorkspaceTemplate { async fn create_workspace_view( &self, uid: i64, - workspace_view_builder: Arc>, + workspace_view_builder: &mut WorkspaceViewBuilder, ) -> Result>; } @@ -91,22 +91,18 @@ impl WorkspaceTemplateBuilder { } pub async fn build(&self) -> Result> { - let workspace_view_builder = Arc::new(RwLock::from(WorkspaceViewBuilder::new( - self.workspace_id.clone(), - self.uid, - ))); - + let mut workspace_view_builder = WorkspaceViewBuilder::new(self.workspace_id.clone(), self.uid); let mut templates: Vec = vec![]; for handler in self.handlers.values() { if let Ok(template) = handler - .create_workspace_view(self.uid, workspace_view_builder.clone()) + .create_workspace_view(self.uid, &mut workspace_view_builder) .await { templates.extend(template); } } - let views = workspace_view_builder.write().await.build(); + let views = workspace_view_builder.build(); // Safe to unwrap because we have at least one view. let first_view = views.first().unwrap().parent_view.clone(); let first_level_views = views diff --git a/libs/workspace-template/src/tests/getting_started_tests.rs b/libs/workspace-template/src/tests/getting_started_tests.rs index 1a83f8ae4..150b39c80 100644 --- a/libs/workspace-template/src/tests/getting_started_tests.rs +++ b/libs/workspace-template/src/tests/getting_started_tests.rs @@ -1,7 +1,5 @@ use std::collections::HashMap; -use std::sync::Arc; -use collab::lock::RwLock; use collab::preclude::uuid_v4; use collab_database::database::DatabaseData; use collab_database::entity::CreateDatabaseParams; @@ -95,18 +93,17 @@ mod tests { #[tokio::test] async fn create_workspace_view_with_getting_started_template_test() { let template = GettingStartedTemplate; - let workspace_view_builder = Arc::new(RwLock::new(WorkspaceViewBuilder::new(generate_id(), 1))); + let mut workspace_view_builder = WorkspaceViewBuilder::new(generate_id(), 1); let result = template - .create_workspace_view(1, workspace_view_builder.clone()) + .create_workspace_view(1, &mut workspace_view_builder) .await .unwrap(); // 2 spaces + 3 documents + 1 database + 5 database rows assert_eq!(result.len(), 11); - let mut builder = workspace_view_builder.write().await; - let views = builder.build(); + let views = workspace_view_builder.build(); // check the number of spaces assert_eq!(views.len(), 2); diff --git a/services/appflowy-worker/Cargo.toml b/services/appflowy-worker/Cargo.toml index 057bb97fe..307f5a6d3 100644 --- a/services/appflowy-worker/Cargo.toml +++ b/services/appflowy-worker/Cargo.toml @@ -46,3 +46,4 @@ mime_guess = "2.0" bytes.workspace = true uuid.workspace = true + diff --git a/services/appflowy-worker/src/application.rs b/services/appflowy-worker/src/application.rs index d237cdfb1..b1094256c 100644 --- a/services/appflowy-worker/src/application.rs +++ b/services/appflowy-worker/src/application.rs @@ -97,7 +97,7 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err Arc::new(state.s3_client.clone()), Arc::new(email_notifier), "import_task_stream", - 30, + 10, )); let app = Router::new().with_state(state); diff --git a/services/appflowy-worker/src/import_worker/unzip.rs b/services/appflowy-worker/src/import_worker/unzip.rs index b873576d6..c34ba7941 100644 --- a/services/appflowy-worker/src/import_worker/unzip.rs +++ b/services/appflowy-worker/src/import_worker/unzip.rs @@ -1,13 +1,16 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use async_zip::base::read::stream::{Ready, ZipFileReader}; use async_zip::{StringEncoding, ZipString}; use futures::io::{AsyncBufRead, AsyncReadExt}; use std::ffi::OsString; use std::os::unix::ffi::OsStringExt; use std::path::PathBuf; -use tokio::fs::{self, File}; +use tokio::fs; +use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tracing::error; + pub struct UnzipFile { pub file_name: String, pub unzip_dir_path: PathBuf, @@ -15,41 +18,69 @@ pub struct UnzipFile { pub async fn unzip_async( mut zip_reader: ZipFileReader>, - out: PathBuf, + out_dir: PathBuf, ) -> Result { let mut real_file_name = None; while let Some(mut next_reader) = zip_reader.next_with_entry().await? { let entry_reader = next_reader.reader_mut(); - let filename = get_filename(entry_reader.entry().filename())?; + let filename = get_filename(entry_reader.entry().filename()) + .with_context(|| "Failed to extract filename from entry".to_string())?; + // Save the root folder name if we haven't done so yet if real_file_name.is_none() && filename.ends_with('/') { real_file_name = Some(filename.split('/').next().unwrap_or(&filename).to_string()); } - let output_path = out.join(&filename); + let output_path = out_dir.join(&filename); if filename.ends_with('/') { - fs::create_dir_all(&output_path).await?; + fs::create_dir_all(&output_path) + .await + .with_context(|| format!("Failed to create directory: {}", output_path.display()))?; } else { + // Ensure parent directories exist if let Some(parent) = output_path.parent() { if !parent.exists() { - fs::create_dir_all(parent).await?; + fs::create_dir_all(parent) + .await + .with_context(|| format!("Failed to create parent directory: {}", parent.display()))?; } } - let mut outfile = File::create(&output_path).await?; + // Write file contents + let mut outfile = File::create(&output_path) + .await + .with_context(|| format!("Failed to create file: {}", output_path.display()))?; let mut buffer = vec![]; - entry_reader.read_to_end(&mut buffer).await?; - outfile.write_all(&buffer).await?; + match entry_reader.read_to_end(&mut buffer).await { + Ok(_) => { + outfile + .write_all(&buffer) + .await + .with_context(|| format!("Failed to write data to file: {}", output_path.display()))?; + }, + Err(err) => { + error!( + "Failed to read entry: {:?}. Error: {:?}", + entry_reader.entry(), + err, + ); + return Err(anyhow::anyhow!( + "Unexpected EOF while reading: {}", + filename + )); + }, + } } + // Move to the next file in the zip zip_reader = next_reader.done().await?; } match real_file_name { - None => Err(anyhow::anyhow!("No files found in zip archive")), + None => Err(anyhow::anyhow!("No files found in the zip archive")), Some(file_name) => Ok(UnzipFile { file_name: file_name.clone(), - unzip_dir_path: out.join(file_name), + unzip_dir_path: out_dir.join(file_name), }), } } diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index b426a5674..70796bd5a 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -1,15 +1,15 @@ use crate::error::ImportError; +use crate::import_worker::report::{ImportNotifier, ImportProgress, ImportResultBuilder}; use crate::import_worker::unzip::unzip_async; use crate::s3_client::S3StreamResponse; use anyhow::anyhow; use async_zip::base::read::stream::ZipFileReader; use aws_sdk_s3::primitives::ByteStream; + use bytes::Bytes; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; - use collab_database::workspace_database::WorkspaceDatabaseBody; - use collab_entity::CollabType; use collab_folder::Folder; use collab_importer::imported_collab::ImportType; @@ -17,6 +17,11 @@ use collab_importer::notion::page::CollabResource; use collab_importer::notion::NotionImporter; use collab_importer::util::FileId; use database::collab::{insert_into_af_collab_bulk_for_user, select_blob_from_af_collab}; +use database::workspace::{ + select_workspace_database_storage_id, update_import_task_status, update_workspace_status, +}; +use database_entity::dto::CollabParams; +use futures::io::BufReader; use futures::stream::FuturesUnordered; use futures::{stream, StreamExt}; use redis::aio::ConnectionManager; @@ -40,16 +45,11 @@ use std::sync::Arc; use std::time::Duration; use tokio::fs; -use crate::import_worker::report::{ImportNotifier, ImportProgress, ImportResultBuilder}; -use database::workspace::{ - select_workspace_database_storage_id, update_import_task_status, update_workspace_status, -}; -use database_entity::dto::CollabParams; - use crate::s3_client::S3Client; use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache}; use tokio::task::spawn_local; use tokio::time::interval; + use tracing::{error, info, trace, warn}; use uuid::Uuid; @@ -216,6 +216,8 @@ async fn consume_task( notifier: Arc, ) -> Result<(), ImportError> { let result = process_task(import_task, s3_client, redis_client, pg_pool, notifier).await; + + // Each task will be consumed only once, regardless of success or failure. let _: () = redis_client .xack(stream_name, group_name, &[entry_id]) .await @@ -234,39 +236,32 @@ async fn process_task( pg_pool: &PgPool, notifier: Arc, ) -> Result<(), ImportError> { - trace!("Processing task: {}", import_task); + trace!("[Import]: Processing task: {}", import_task); match import_task { ImportTask::Notion(task) => { - let result = async { - // 1. unzip file to temp dir - let unzip_dir_path = download_zip_file(&task, s3_client).await?; - // 2. import zip - let result = - process_unzip_file(&task, &unzip_dir_path, pg_pool, redis_client, s3_client).await; - - // 3. delete zip file regardless of success or failure - match fs::remove_dir_all(unzip_dir_path).await { - Ok(_) => trace!("[Import]: {} deleted unzip file", task.workspace_id), - Err(err) => error!("Failed to delete unzip file: {:?}", err), - } - result - } - .await; - - // 4. remove file from S3 - if let Err(err) = s3_client.delete_blob(task.s3_key.as_str()).await { - error!("Failed to delete zip file from S3: {:?}", err); + // 1. download zip file + match download_and_unzip_file(&task, s3_client).await { + Ok(unzip_dir_path) => { + // 2. process unzip file + let result = + process_unzip_file(&task, &unzip_dir_path, pg_pool, redis_client, s3_client).await; + + // 3. remove file from S3 + if let Err(err) = s3_client.delete_blob(task.s3_key.as_str()).await { + error!("Failed to delete zip file from S3: {:?}", err); + } + notify_user(&task, result, notifier).await?; + }, + Err(err) => { + // If there is any errors when download or unzip the file, we will remove the file from S3 and notify the user. + if let Err(err) = s3_client.delete_blob(task.s3_key.as_str()).await { + error!("Failed to delete zip file from S3: {:?}", err); + } + notify_user(&task, Err(err), notifier).await?; + }, } - // 5. notify import result - trace!( - "[Import]: {}:{} import result: {:?}", - task.workspace_id, - task.task_id, - result - ); - notify_user(&task, result, notifier).await?; Ok(()) }, ImportTask::Custom(value) => { @@ -287,19 +282,22 @@ async fn process_task( } } -async fn download_zip_file( +async fn download_and_unzip_file( import_task: &NotionImportTask, s3_client: &Arc, ) -> Result { let S3StreamResponse { stream, content_type: _, + content_length, } = s3_client - .get_blob(import_task.s3_key.as_str()) + .get_blob_stream(import_task.s3_key.as_str()) .await .map_err(|err| ImportError::Internal(err.into()))?; + let buffer_size = buffer_size_from_content_length(content_length); + let reader = BufReader::with_capacity(buffer_size, stream); + let zip_reader = ZipFileReader::new(reader); - let zip_reader = ZipFileReader::new(stream); let unique_file_name = uuid::Uuid::new_v4().to_string(); let output_file_path = temp_dir().join(unique_file_name); fs::create_dir_all(&output_file_path) @@ -318,6 +316,26 @@ async fn download_zip_file( Ok(unzip_file.unzip_dir_path) } +/// Determines the buffer size based on the content length of the file. +/// If the buffer is too small, the zip reader will frequently pause to fetch more data, +/// causing delays. This can make the unzip process appear slower and can even cause premature +/// errors (like EOF) if there is a delay in fetching more data. +#[inline] +fn buffer_size_from_content_length(content_length: Option) -> usize { + match content_length { + Some(file_size) => { + if file_size < 10 * 1024 * 1024 { + 1024 * 1024 // 1MB buffer + } else if file_size < 100 * 1024 * 1024 { + 5 * 1024 * 1024 // 2MB buffer + } else { + 10 * 1024 * 1024 // 5MB buffer + } + }, + None => 1024 * 1024, + } +} + async fn process_unzip_file( import_task: &NotionImportTask, unzip_dir_path: &PathBuf, @@ -562,14 +580,31 @@ async fn process_unzip_file( .await .map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?; + // 3. delete zip file regardless of success or failure + match fs::remove_dir_all(unzip_dir_path).await { + Ok(_) => trace!("[Import]: {} deleted unzip file", import_task.workspace_id), + Err(err) => error!("Failed to delete unzip file: {:?}", err), + } + Ok(()) } async fn notify_user( - _import_task: &NotionImportTask, - _result: Result<(), ImportError>, + import_task: &NotionImportTask, + result: Result<(), ImportError>, _notifier: Arc, ) -> Result<(), ImportError> { + match result { + Ok(_) => { + trace!("[Import]: successfully imported:{}", import_task); + }, + Err(err) => { + error!( + "[Import]: failed to import:{}: error:{:?}", + import_task, err + ); + }, + } // send email Ok(()) } @@ -735,6 +770,16 @@ pub struct NotionImportTask { pub s3_key: String, pub host: String, } +impl Display for NotionImportTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "NotionImportTask {{ workspace_id: {}, workspace_name: {} }}", + self.workspace_id, self.workspace_name + ) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum ImportTask { diff --git a/services/appflowy-worker/src/s3_client.rs b/services/appflowy-worker/src/s3_client.rs index 30af20fce..d5bd8db4c 100644 --- a/services/appflowy-worker/src/s3_client.rs +++ b/services/appflowy-worker/src/s3_client.rs @@ -10,7 +10,7 @@ use tokio_util::compat::TokioAsyncReadCompatExt; #[async_trait] pub trait S3Client: Send + Sync { - async fn get_blob(&self, object_key: &str) -> Result; + async fn get_blob_stream(&self, object_key: &str) -> Result; async fn put_blob( &self, object_key: &str, @@ -36,7 +36,7 @@ impl Deref for S3ClientImpl { #[async_trait] impl S3Client for S3ClientImpl { - async fn get_blob(&self, object_key: &str) -> Result { + async fn get_blob_stream(&self, object_key: &str) -> Result { match self .inner .get_object() @@ -48,9 +48,11 @@ impl S3Client for S3ClientImpl { Ok(output) => { let stream = output.body.into_async_read().compat(); let content_type = output.content_type; + let content_length = output.content_length; Ok(S3StreamResponse { stream: Box::new(stream), content_type, + content_length, }) }, Err(SdkError::ServiceError(service_err)) => match service_err.err() { @@ -118,4 +120,5 @@ impl S3Client for S3ClientImpl { pub struct S3StreamResponse { pub stream: Box, pub content_type: Option, + pub content_length: Option, } diff --git a/services/appflowy-worker/tests/import_test.rs b/services/appflowy-worker/tests/import_test.rs index 5651767be..aff333aac 100644 --- a/services/appflowy-worker/tests/import_test.rs +++ b/services/appflowy-worker/tests/import_test.rs @@ -203,7 +203,7 @@ struct MockS3Client; #[async_trait] impl S3Client for MockS3Client { - async fn get_blob(&self, _object_key: &str) -> Result { + async fn get_blob_stream(&self, _object_key: &str) -> Result { todo!() }