diff --git a/Cargo.lock b/Cargo.lock index a43f1a1b3..3176237c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2230,7 +2230,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "arc-swap", @@ -2255,7 +2255,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "async-trait", @@ -2294,7 +2294,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "arc-swap", @@ -2315,7 +2315,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "bytes", @@ -2335,7 +2335,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "arc-swap", @@ -2357,7 +2357,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "async-recursion", @@ -2371,6 +2371,7 @@ dependencies = [ "collab-folder", "fancy-regex 0.13.0", "futures", + "futures-util", "fxhash", "hex", "markdown", @@ -2454,7 +2455,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f882a1720f7b35a874ea276e604434f0b5b04698#f882a1720f7b35a874ea276e604434f0b5b04698" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2934851d337c7d322c5dd7a944c05cb7ffb930d4#2934851d337c7d322c5dd7a944c05cb7ffb930d4" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index b79922403..de5fa657d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -293,13 +293,13 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f882a1720f7b35a874ea276e604434f0b5b04698" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2934851d337c7d322c5dd7a944c05cb7ffb930d4" } [features] history = [] diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 4274d9729..a211aeb3c 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -298,10 +298,38 @@ impl Client { AppResponse::<()>::from_response(resp).await?.into_error() } + /// Sends a POST request to import a file to the server. + /// + /// This function streams the contents of a file located at the provided `file_path` + /// as part of a multipart form data request to the server's `/api/import` endpoint. + /// + /// ### HTTP Request Details: + /// + /// - **Method:** POST + /// - **URL:** `{base_url}/api/import` + /// - The `base_url` is dynamically provided and appended with `/api/import`. + /// + /// - **Headers:** + /// - `X-Host`: The value of the `base_url` is sent as the host header. + /// - `X-Content-Length`: The size of the file, in bytes, is provided from the file's metadata. + /// + /// - **Multipart Form:** + /// - The file is sent as a multipart form part: + /// - **Field Name:** The file name derived from the file path or a UUID if unavailable. + /// - **File Content:** The file's content is streamed using `reqwest::Body::wrap_stream`. + /// - **MIME Type:** Guessed from the file's extension using the `mime_guess` crate, + /// defaulting to `application/octet-stream` if undetermined. + /// + /// ### Parameters: + /// - `file_path`: The path to the file to be uploaded. + /// - The file is opened asynchronously and its metadata (like size) is extracted. + /// - The MIME type is automatically determined based on the file extension using `mime_guess`. + /// pub async fn import_file(&self, file_path: &Path) -> Result<(), AppResponseError> { let file = File::open(&file_path).await?; + let metadata = file.metadata().await?; let file_name = file_path - .file_name() + .file_stem() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); @@ -311,10 +339,10 @@ impl Client { .to_string(); let file_part = multipart::Part::stream(reqwest::Body::wrap_stream(stream)) - .file_name(file_name) + .file_name(file_name.clone()) .mime_str(&mime)?; - let form = multipart::Form::new().part("file", file_part); + let form = multipart::Form::new().part(file_name, file_part); let url = format!("{}/api/import", self.base_url); let mut builder = self .http_client_with_auth(Method::POST, &url) @@ -322,7 +350,9 @@ impl Client { .multipart(form); // set the host header - builder = builder.header("X-Host", self.base_url.clone()); + builder = builder + .header("X-Host", self.base_url.clone()) + .header("X-Content-Length", metadata.len()); let resp = builder.send().await?; AppResponse::<()>::from_response(resp).await?.into_error() diff --git a/libs/workspace-template/src/database/database_collab.rs b/libs/workspace-template/src/database/database_collab.rs index 28d0b1449..214e0425d 100644 --- a/libs/workspace-template/src/database/database_collab.rs +++ b/libs/workspace-template/src/database/database_collab.rs @@ -12,8 +12,8 @@ pub async fn create_database_collab( collab_service, notifier: Default::default(), }; - Database::create_with_view(params, context) - .await? + let database = Database::create_with_view(params, context).await?; + database .encode_database_collabs() .await .map_err(|e| anyhow::anyhow!("Failed to encode database collabs: {:?}", e)) diff --git a/libs/workspace-template/src/document/getting_started.rs b/libs/workspace-template/src/document/getting_started.rs index 23f0c3b9b..f175530df 100644 --- a/libs/workspace-template/src/document/getting_started.rs +++ b/libs/workspace-template/src/document/getting_started.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; -use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; use collab::core::origin::CollabOrigin; -use collab::lock::RwLock; + use collab::preclude::Collab; use collab_database::database::{timestamp, DatabaseData}; use collab_database::entity::CreateDatabaseParams; @@ -66,12 +65,7 @@ impl GettingStartedTemplate { create_database_params: CreateDatabaseParams, ) -> anyhow::Result> { let database_id = create_database_params.database_id.clone(); - let encoded_database = tokio::task::spawn_blocking({ - let create_database_params = create_database_params.clone(); - move || create_database_collab(create_database_params) - }) - .await? - .await?; + let encoded_database = create_database_collab(create_database_params).await?; let encoded_database_collab = encoded_database .encoded_database_collab @@ -236,7 +230,7 @@ impl WorkspaceTemplate for GettingStartedTemplate { async fn create_workspace_view( &self, _uid: i64, - workspace_view_builder: Arc>, + workspace_view_builder: &mut WorkspaceViewBuilder, ) -> anyhow::Result> { let general_view_uuid = gen_view_id().to_string(); let shared_view_uuid = gen_view_id().to_string(); @@ -263,12 +257,10 @@ impl WorkspaceTemplate for GettingStartedTemplate { ) .await?; - let mut builder = workspace_view_builder.write().await; - // Create general space with 2 built-in views: Getting started, To-dos // The Getting started view is a document view, and the To-dos view is a board view // The Getting started view contains 2 sub views: Desktop guide, Mobile guide - builder + workspace_view_builder .with_view_builder(|view_builder| async { let created_at = timestamp(); let mut view_builder = view_builder @@ -305,7 +297,7 @@ impl WorkspaceTemplate for GettingStartedTemplate { .await; // Create shared space without any built-in views - builder + workspace_view_builder .with_view_builder(|view_builder| async { let created_at = timestamp(); let view_builder = view_builder @@ -371,12 +363,11 @@ impl WorkspaceTemplate for DocumentTemplate { async fn create_workspace_view( &self, _uid: i64, - workspace_view_builder: Arc>, + workspace_view_builder: &mut WorkspaceViewBuilder, ) -> anyhow::Result> { let view_id = gen_view_id().to_string(); - let mut builder = workspace_view_builder.write().await; - builder + workspace_view_builder .with_view_builder(|view_builder| async { view_builder .with_name("Getting started") diff --git a/libs/workspace-template/src/lib.rs b/libs/workspace-template/src/lib.rs index 30c3a69f1..c85b94cf8 100644 --- a/libs/workspace-template/src/lib.rs +++ b/libs/workspace-template/src/lib.rs @@ -5,7 +5,7 @@ pub use anyhow::Result; use async_trait::async_trait; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; -use collab::lock::RwLock; + use collab::preclude::Collab; use collab_entity::CollabType; use collab_folder::{ @@ -30,7 +30,7 @@ pub trait WorkspaceTemplate { async fn create_workspace_view( &self, uid: i64, - workspace_view_builder: Arc>, + workspace_view_builder: &mut WorkspaceViewBuilder, ) -> Result>; } @@ -91,22 +91,18 @@ impl WorkspaceTemplateBuilder { } pub async fn build(&self) -> Result> { - let workspace_view_builder = Arc::new(RwLock::from(WorkspaceViewBuilder::new( - self.workspace_id.clone(), - self.uid, - ))); - + let mut workspace_view_builder = WorkspaceViewBuilder::new(self.workspace_id.clone(), self.uid); let mut templates: Vec = vec![]; for handler in self.handlers.values() { if let Ok(template) = handler - .create_workspace_view(self.uid, workspace_view_builder.clone()) + .create_workspace_view(self.uid, &mut workspace_view_builder) .await { templates.extend(template); } } - let views = workspace_view_builder.write().await.build(); + let views = workspace_view_builder.build(); // Safe to unwrap because we have at least one view. let first_view = views.first().unwrap().parent_view.clone(); let first_level_views = views diff --git a/libs/workspace-template/src/tests/getting_started_tests.rs b/libs/workspace-template/src/tests/getting_started_tests.rs index 1a83f8ae4..150b39c80 100644 --- a/libs/workspace-template/src/tests/getting_started_tests.rs +++ b/libs/workspace-template/src/tests/getting_started_tests.rs @@ -1,7 +1,5 @@ use std::collections::HashMap; -use std::sync::Arc; -use collab::lock::RwLock; use collab::preclude::uuid_v4; use collab_database::database::DatabaseData; use collab_database::entity::CreateDatabaseParams; @@ -95,18 +93,17 @@ mod tests { #[tokio::test] async fn create_workspace_view_with_getting_started_template_test() { let template = GettingStartedTemplate; - let workspace_view_builder = Arc::new(RwLock::new(WorkspaceViewBuilder::new(generate_id(), 1))); + let mut workspace_view_builder = WorkspaceViewBuilder::new(generate_id(), 1); let result = template - .create_workspace_view(1, workspace_view_builder.clone()) + .create_workspace_view(1, &mut workspace_view_builder) .await .unwrap(); // 2 spaces + 3 documents + 1 database + 5 database rows assert_eq!(result.len(), 11); - let mut builder = workspace_view_builder.write().await; - let views = builder.build(); + let views = workspace_view_builder.build(); // check the number of spaces assert_eq!(views.len(), 2); diff --git a/services/appflowy-worker/Cargo.toml b/services/appflowy-worker/Cargo.toml index 057bb97fe..307f5a6d3 100644 --- a/services/appflowy-worker/Cargo.toml +++ b/services/appflowy-worker/Cargo.toml @@ -46,3 +46,4 @@ mime_guess = "2.0" bytes.workspace = true uuid.workspace = true + diff --git a/services/appflowy-worker/src/application.rs b/services/appflowy-worker/src/application.rs index d237cdfb1..b1094256c 100644 --- a/services/appflowy-worker/src/application.rs +++ b/services/appflowy-worker/src/application.rs @@ -97,7 +97,7 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err Arc::new(state.s3_client.clone()), Arc::new(email_notifier), "import_task_stream", - 30, + 10, )); let app = Router::new().with_state(state); diff --git a/services/appflowy-worker/src/import_worker/unzip.rs b/services/appflowy-worker/src/import_worker/unzip.rs index 6f5d91690..c34ba7941 100644 --- a/services/appflowy-worker/src/import_worker/unzip.rs +++ b/services/appflowy-worker/src/import_worker/unzip.rs @@ -1,10 +1,16 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use async_zip::base::read::stream::{Ready, ZipFileReader}; +use async_zip::{StringEncoding, ZipString}; use futures::io::{AsyncBufRead, AsyncReadExt}; +use std::ffi::OsString; +use std::os::unix::ffi::OsStringExt; use std::path::PathBuf; -use tokio::fs::{self, File}; +use tokio::fs; +use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tracing::error; + pub struct UnzipFile { pub file_name: String, pub unzip_dir_path: PathBuf, @@ -12,41 +18,84 @@ pub struct UnzipFile { pub async fn unzip_async( mut zip_reader: ZipFileReader>, - out: PathBuf, + out_dir: PathBuf, ) -> Result { let mut real_file_name = None; while let Some(mut next_reader) = zip_reader.next_with_entry().await? { let entry_reader = next_reader.reader_mut(); - let filename = entry_reader.entry().filename().as_str()?; + let filename = get_filename(entry_reader.entry().filename()) + .with_context(|| "Failed to extract filename from entry".to_string())?; + // Save the root folder name if we haven't done so yet if real_file_name.is_none() && filename.ends_with('/') { - real_file_name = Some(filename.split('/').next().unwrap_or(filename).to_string()); + real_file_name = Some(filename.split('/').next().unwrap_or(&filename).to_string()); } - let output_path = out.join(filename); + let output_path = out_dir.join(&filename); if filename.ends_with('/') { - fs::create_dir_all(&output_path).await?; + fs::create_dir_all(&output_path) + .await + .with_context(|| format!("Failed to create directory: {}", output_path.display()))?; } else { + // Ensure parent directories exist if let Some(parent) = output_path.parent() { if !parent.exists() { - fs::create_dir_all(parent).await?; + fs::create_dir_all(parent) + .await + .with_context(|| format!("Failed to create parent directory: {}", parent.display()))?; } } - let mut outfile = File::create(&output_path).await?; + // Write file contents + let mut outfile = File::create(&output_path) + .await + .with_context(|| format!("Failed to create file: {}", output_path.display()))?; let mut buffer = vec![]; - entry_reader.read_to_end(&mut buffer).await?; - outfile.write_all(&buffer).await?; + match entry_reader.read_to_end(&mut buffer).await { + Ok(_) => { + outfile + .write_all(&buffer) + .await + .with_context(|| format!("Failed to write data to file: {}", output_path.display()))?; + }, + Err(err) => { + error!( + "Failed to read entry: {:?}. Error: {:?}", + entry_reader.entry(), + err, + ); + return Err(anyhow::anyhow!( + "Unexpected EOF while reading: {}", + filename + )); + }, + } } + // Move to the next file in the zip zip_reader = next_reader.done().await?; } match real_file_name { - None => Err(anyhow::anyhow!("No files found in zip archive")), + None => Err(anyhow::anyhow!("No files found in the zip archive")), Some(file_name) => Ok(UnzipFile { file_name: file_name.clone(), - unzip_dir_path: out.join(file_name), + unzip_dir_path: out_dir.join(file_name), }), } } + +pub fn get_filename(zip_string: &ZipString) -> Result { + match zip_string.encoding() { + StringEncoding::Utf8 => match zip_string.as_str() { + Ok(valid_str) => Ok(valid_str.to_string()), + Err(err) => Err(err.into()), + }, + + StringEncoding::Raw => { + let raw_bytes = zip_string.as_bytes(); + let os_string = OsString::from_vec(raw_bytes.to_vec()); + Ok(os_string.to_string_lossy().into_owned()) + }, + } +} diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index 1dfd8c0e1..55321a72f 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -1,15 +1,15 @@ use crate::error::ImportError; +use crate::import_worker::report::{ImportNotifier, ImportProgress, ImportResultBuilder}; use crate::import_worker::unzip::unzip_async; use crate::s3_client::S3StreamResponse; use anyhow::anyhow; use async_zip::base::read::stream::ZipFileReader; use aws_sdk_s3::primitives::ByteStream; + use bytes::Bytes; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; - use collab_database::workspace_database::WorkspaceDatabaseBody; - use collab_entity::CollabType; use collab_folder::Folder; use collab_importer::imported_collab::ImportType; @@ -17,6 +17,11 @@ use collab_importer::notion::page::CollabResource; use collab_importer::notion::NotionImporter; use collab_importer::util::FileId; use database::collab::{insert_into_af_collab_bulk_for_user, select_blob_from_af_collab}; +use database::workspace::{ + select_workspace_database_storage_id, update_import_task_status, update_workspace_status, +}; +use database_entity::dto::CollabParams; +use futures::io::BufReader; use futures::stream::FuturesUnordered; use futures::{stream, StreamExt}; use redis::aio::ConnectionManager; @@ -31,6 +36,7 @@ use sqlx::types::chrono; use sqlx::{PgPool, Pool, Postgres}; use std::collections::HashMap; use std::env::temp_dir; +use std::fmt::Display; use std::fs::Permissions; use std::ops::DerefMut; use std::os::unix::fs::PermissionsExt; @@ -39,16 +45,11 @@ use std::sync::Arc; use std::time::Duration; use tokio::fs; -use crate::import_worker::report::{ImportNotifier, ImportProgress, ImportResultBuilder}; -use database::workspace::{ - select_workspace_database_storage_id, update_import_task_status, update_workspace_status, -}; -use database_entity::dto::CollabParams; - use crate::s3_client::S3Client; use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache}; use tokio::task::spawn_local; use tokio::time::interval; + use tracing::{error, info, trace, warn}; use uuid::Uuid; @@ -214,7 +215,9 @@ async fn consume_task( pg_pool: &Pool, notifier: Arc, ) -> Result<(), ImportError> { - process_task(import_task, s3_client, redis_client, pg_pool, notifier).await?; + let result = process_task(import_task, s3_client, redis_client, pg_pool, notifier).await; + + // Each task will be consumed only once, regardless of success or failure. let _: () = redis_client .xack(stream_name, group_name, &[entry_id]) .await @@ -222,7 +225,8 @@ async fn consume_task( error!("Failed to acknowledge task: {:?}", e); ImportError::Internal(e.into()) })?; - Ok::<_, ImportError>(()) + + result } async fn process_task( @@ -232,31 +236,32 @@ async fn process_task( pg_pool: &PgPool, notifier: Arc, ) -> Result<(), ImportError> { - trace!("Processing task: {:?}", import_task); + trace!("[Import]: Processing task: {}", import_task); + match import_task { ImportTask::Notion(task) => { - // 1. unzip file to temp dir - let unzip_dir_path = download_zip_file(&task, s3_client).await?; - // 2. import zip - let result = - process_unzip_file(&task, &unzip_dir_path, pg_pool, redis_client, s3_client).await; - // 3. delete zip file regardless of success or failure - match fs::remove_dir_all(unzip_dir_path).await { - Ok(_) => trace!("[Import]: {} deleted unzip file", task.workspace_id), - Err(err) => error!("Failed to delete unzip file: {:?}", err), - } - // 4. notify import result - trace!( - "[Import]: {}:{} import result: {:?}", - task.workspace_id, - task.task_id, - result - ); - notify_user(&task, result, notifier).await?; - // 5. remove file from S3 - if let Err(err) = s3_client.delete_blob(task.s3_key.as_str()).await { - error!("Failed to delete zip file from S3: {:?}", err); + // 1. download zip file + match download_and_unzip_file(&task, s3_client).await { + Ok(unzip_dir_path) => { + // 2. process unzip file + let result = + process_unzip_file(&task, &unzip_dir_path, pg_pool, redis_client, s3_client).await; + + // 3. remove file from S3 + if let Err(err) = s3_client.delete_blob(task.s3_key.as_str()).await { + error!("Failed to delete zip file from S3: {:?}", err); + } + notify_user(&task, result, notifier).await?; + }, + Err(err) => { + // If there is any errors when download or unzip the file, we will remove the file from S3 and notify the user. + if let Err(err) = s3_client.delete_blob(task.s3_key.as_str()).await { + error!("Failed to delete zip file from S3: {:?}", err); + } + notify_user(&task, Err(err), notifier).await?; + }, } + Ok(()) }, ImportTask::Custom(value) => { @@ -277,20 +282,23 @@ async fn process_task( } } -async fn download_zip_file( +async fn download_and_unzip_file( import_task: &NotionImportTask, s3_client: &Arc, ) -> Result { let S3StreamResponse { stream, content_type: _, + content_length, } = s3_client - .get_blob(import_task.s3_key.as_str()) + .get_blob_stream(import_task.s3_key.as_str()) .await .map_err(|err| ImportError::Internal(err.into()))?; + let buffer_size = buffer_size_from_content_length(content_length); + let reader = BufReader::with_capacity(buffer_size, stream); + let zip_reader = ZipFileReader::new(reader); - let zip_reader = ZipFileReader::new(stream); - let unique_file_name = uuid::Uuid::new_v4().to_string(); + let unique_file_name = Uuid::new_v4().to_string(); let output_file_path = temp_dir().join(unique_file_name); fs::create_dir_all(&output_file_path) .await @@ -308,6 +316,26 @@ async fn download_zip_file( Ok(unzip_file.unzip_dir_path) } +/// Determines the buffer size based on the content length of the file. +/// If the buffer is too small, the zip reader will frequently pause to fetch more data, +/// causing delays. This can make the unzip process appear slower and can even cause premature +/// errors (like EOF) if there is a delay in fetching more data. +#[inline] +fn buffer_size_from_content_length(content_length: Option) -> usize { + match content_length { + Some(file_size) => { + if file_size < 10 * 1024 * 1024 { + 3 * 1024 * 1024 + } else if file_size < 100 * 1024 * 1024 { + 5 * 1024 * 1024 // 5MB buffer + } else { + 10 * 1024 * 1024 // 10MB buffer + } + }, + None => 3 * 1024 * 1024, + } +} + async fn process_unzip_file( import_task: &NotionImportTask, unzip_dir_path: &PathBuf, @@ -552,14 +580,31 @@ async fn process_unzip_file( .await .map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?; + // 3. delete zip file regardless of success or failure + match fs::remove_dir_all(unzip_dir_path).await { + Ok(_) => trace!("[Import]: {} deleted unzip file", import_task.workspace_id), + Err(err) => error!("Failed to delete unzip file: {:?}", err), + } + Ok(()) } async fn notify_user( - _import_task: &NotionImportTask, - _result: Result<(), ImportError>, + import_task: &NotionImportTask, + result: Result<(), ImportError>, _notifier: Arc, ) -> Result<(), ImportError> { + match result { + Ok(_) => { + trace!("[Import]: successfully imported:{}", import_task); + }, + Err(err) => { + error!( + "[Import]: failed to import:{}: error:{:?}", + import_task, err + ); + }, + } // send email Ok(()) } @@ -721,9 +766,19 @@ pub struct NotionImportTask { pub task_id: Uuid, pub user_uuid: String, pub workspace_id: String, + pub workspace_name: String, pub s3_key: String, pub host: String, } +impl Display for NotionImportTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "NotionImportTask {{ workspace_id: {}, workspace_name: {} }}", + self.workspace_id, self.workspace_name + ) + } +} #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -732,6 +787,19 @@ pub enum ImportTask { Custom(serde_json::Value), } +impl Display for ImportTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ImportTask::Notion(task) => write!( + f, + "NotionImportTask {{ workspace_id: {}, workspace_name: {} }}", + task.workspace_id, task.workspace_name + ), + ImportTask::Custom(value) => write!(f, "CustomTask {{ {} }}", value), + } + } +} + impl TryFrom<&StreamId> for ImportTask { type Error = ImportError; diff --git a/services/appflowy-worker/src/s3_client.rs b/services/appflowy-worker/src/s3_client.rs index 30af20fce..d5bd8db4c 100644 --- a/services/appflowy-worker/src/s3_client.rs +++ b/services/appflowy-worker/src/s3_client.rs @@ -10,7 +10,7 @@ use tokio_util::compat::TokioAsyncReadCompatExt; #[async_trait] pub trait S3Client: Send + Sync { - async fn get_blob(&self, object_key: &str) -> Result; + async fn get_blob_stream(&self, object_key: &str) -> Result; async fn put_blob( &self, object_key: &str, @@ -36,7 +36,7 @@ impl Deref for S3ClientImpl { #[async_trait] impl S3Client for S3ClientImpl { - async fn get_blob(&self, object_key: &str) -> Result { + async fn get_blob_stream(&self, object_key: &str) -> Result { match self .inner .get_object() @@ -48,9 +48,11 @@ impl S3Client for S3ClientImpl { Ok(output) => { let stream = output.body.into_async_read().compat(); let content_type = output.content_type; + let content_length = output.content_length; Ok(S3StreamResponse { stream: Box::new(stream), content_type, + content_length, }) }, Err(SdkError::ServiceError(service_err)) => match service_err.err() { @@ -118,4 +120,5 @@ impl S3Client for S3ClientImpl { pub struct S3StreamResponse { pub stream: Box, pub content_type: Option, + pub content_length: Option, } diff --git a/services/appflowy-worker/tests/import_test.rs b/services/appflowy-worker/tests/import_test.rs index 5651767be..aff333aac 100644 --- a/services/appflowy-worker/tests/import_test.rs +++ b/services/appflowy-worker/tests/import_test.rs @@ -203,7 +203,7 @@ struct MockS3Client; #[async_trait] impl S3Client for MockS3Client { - async fn get_blob(&self, _object_key: &str) -> Result { + async fn get_blob_stream(&self, _object_key: &str) -> Result { todo!() } diff --git a/src/api/data_import.rs b/src/api/data_import.rs index f01b6ee4c..7dc25fb70 100644 --- a/src/api/data_import.rs +++ b/src/api/data_import.rs @@ -16,7 +16,7 @@ use shared_entity::response::{AppResponse, JsonAppResponse}; use std::env::temp_dir; use tokio::fs::File; use tokio::io::AsyncWriteExt; -use tracing::{error, trace}; +use tracing::{error, info, trace}; use uuid::Uuid; pub fn data_import_scope() -> Scope { @@ -64,9 +64,14 @@ async fn import_data_handler( ) -> actix_web::Result> { let uid = state.user_cache.get_user_uid(&user_uuid).await?; let host = get_host_from_request(&req); + let content_length = req + .headers() + .get("X-Content-Length") + .and_then(|h| h.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); - let time = chrono::Local::now().format("%d/%m/%Y %H:%M").to_string(); - let workspace_name = format!("import-{}", time); + let mut workspace_name = "".to_string(); // file_name must be unique let file_name = format!("{}.zip", Uuid::new_v4()); @@ -76,6 +81,11 @@ async fn import_data_handler( let mut file = File::create(&file_path).await?; while let Some(item) = payload.next().await { let mut field = item?; + workspace_name = field + .content_disposition() + .and_then(|c| c.get_name().map(|f| f.to_string())) + .unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M"))); + while let Some(chunk) = field.next().await { let data = chunk?; file_size += data.len(); @@ -85,6 +95,26 @@ async fn import_data_handler( file.shutdown().await?; drop(file); + if workspace_name.is_empty() { + return Err(AppError::InvalidRequest("Invalid file".to_string()).into()); + } + + if content_length != file_size { + trace!( + "Import file fail. The Content-Length:{} doesn't match file size:{}", + content_length, + file_size + ); + + return Err( + AppError::InvalidRequest(format!( + "Content-Length:{} doesn't match file size:{}", + content_length, file_size + )) + .into(), + ); + } + let workspace = create_empty_workspace( &state.pg_pool, &state.workspace_access_control, @@ -96,11 +126,9 @@ async fn import_data_handler( .await?; let workspace_id = workspace.workspace_id.to_string(); - trace!( - "User:{} import data:{} to new workspace:{}", - uid, - file_size, - workspace_id + info!( + "User:{} import data:{} to new workspace:{}, name:{}", + uid, file_size, workspace_id, workspace_name, ); let stream = ByteStream::from_path(&file_path).await.map_err(|e| { AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e)) @@ -121,6 +149,7 @@ async fn import_data_handler( uid, &user_uuid, &workspace_id, + &workspace_name, file_size, &host, &state.redis_connection_manager, diff --git a/src/biz/workspace/ops.rs b/src/biz/workspace/ops.rs index 3f22b758b..4090c3f1c 100644 --- a/src/biz/workspace/ops.rs +++ b/src/biz/workspace/ops.rs @@ -694,10 +694,12 @@ async fn check_if_user_is_allowed_to_delete_comment( Ok(()) } +#[allow(clippy::too_many_arguments)] pub async fn create_upload_task( uid: i64, user_uuid: &UserUuid, workspace_id: &str, + workspace_name: &str, file_size: usize, host: &str, redis_client: &RedisConnectionManager, @@ -725,6 +727,7 @@ pub async fn create_upload_task( "workspace_id": workspace_id, "s3_key": workspace_id, "host": host, + "workspace_name": workspace_name, } }); let _: () = redis_client diff --git a/tests/workspace/import_test.rs b/tests/workspace/import_test.rs index 61fcf6629..0b696f244 100644 --- a/tests/workspace/import_test.rs +++ b/tests/workspace/import_test.rs @@ -18,7 +18,7 @@ async fn import_blog_post_test() { ); let space_view = space_views.pop().unwrap(); - assert_eq!(space_view.name, "blog_post"); + assert_eq!(space_view.name, "Imported Space"); let imported_view = folder.get_views_belong_to(&space_view.id).pop().unwrap(); let document = client @@ -62,7 +62,7 @@ async fn import_project_and_task_zip_test() { "Expected 1 view, found {:?}", space_views ); - assert_eq!(space_views[0].name, "project&task"); + assert_eq!(space_views[0].name, "Imported Space"); assert!(space_views[0].space_info().is_some()); let mut sub_views = folder.get_views_belong_to(&space_views[0].id);