From cfad6c83d14568deaf6388d186139b19e6642729 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 10 Nov 2024 03:15:38 +0800 Subject: [PATCH 01/10] refactor: Use opendal to replace object_store Signed-off-by: Xuanwo --- Cargo.toml | 8 +-- src/bgworker/mod.rs | 4 +- src/bgworker/sqlt.rs | 12 ++-- src/boot.rs | 3 +- src/mailer/email_sender.rs | 3 +- src/storage/drivers/aws.rs | 59 ++++++------------- src/storage/drivers/azure.rs | 22 +++---- src/storage/drivers/gcp.rs | 25 +++----- src/storage/drivers/local.rs | 20 ++++--- src/storage/drivers/mem.rs | 11 +++- src/storage/drivers/mod.rs | 25 ++++++-- ...ct_store_adapter.rs => opendal_adapter.rs} | 58 ++++++++++-------- src/storage/mod.rs | 2 +- src/storage/strategies/backup.rs | 7 +-- src/storage/strategies/mirror.rs | 7 +-- 15 files changed, 132 insertions(+), 134 deletions(-) rename src/storage/drivers/{object_store_adapter.rs => opendal_adapter.rs} (59%) diff --git a/Cargo.toml b/Cargo.toml index ad57098a2..cbe2bab78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,9 +38,9 @@ with-db = ["dep:sea-orm", "dep:sea-orm-migration", "loco-gen/with-db"] channels = ["dep:socketioxide"] # Storage features all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"] -storage_aws_s3 = ["object_store/aws"] -storage_azure = ["object_store/azure"] -storage_gcp = ["object_store/gcp"] +storage_aws_s3 = ["opendal/services-s3"] +storage_azure = ["opendal/services-azblob"] +storage_gcp = ["opendal/services-gcs"] # Cache feature cache_inmem = ["dep:moka"] bg_redis = ["dep:rusty-sidekiq", "dep:bb8"] @@ -125,7 +125,7 @@ socketioxide = { version = "0.14.0", features = ["state"], optional = true } # File Upload -object_store = { version = "0.11.0", default-features = false } +opendal = { version = "0.50.2", default-features = false,features = ["services-memory","services-fs"] } # cache moka = { version = "0.12.7", features = ["sync"], optional = true } diff --git a/src/bgworker/mod.rs b/src/bgworker/mod.rs index 85b6edfc3..a2cce76a4 100644 --- a/src/bgworker/mod.rs +++ b/src/bgworker/mod.rs @@ -250,8 +250,8 @@ impl Queue { /// # Errors /// - /// Does not currently return an error, but the postgres or other future queue implementations - /// might, so using Result here as return type. + /// Does not currently return an error, but the postgres or other future + /// queue implementations might, so using Result here as return type. pub fn shutdown(&self) -> Result<()> { println!("waiting for running jobs to finish..."); match self { diff --git a/src/bgworker/sqlt.rs b/src/bgworker/sqlt.rs index 5d8c4b1d7..6fd450ff1 100644 --- a/src/bgworker/sqlt.rs +++ b/src/bgworker/sqlt.rs @@ -282,7 +282,8 @@ async fn dequeue(client: &SqlitePool) -> Result> { if let Some(task) = row { sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP WHERE id = $1", + "UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP \ + WHERE id = $1", ) .bind(&task.id) .execute(&mut *tx) @@ -325,7 +326,8 @@ async fn complete_task( if let Some(interval_ms) = interval_ms { let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms); sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at = DATETIME($1) WHERE id = $2", + "UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at \ + = DATETIME($1) WHERE id = $2", ) .bind(next_run_at) .bind(task_id) @@ -333,7 +335,8 @@ async fn complete_task( .await?; } else { sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = $1", + "UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP \ + WHERE id = $1", ) .bind(task_id) .execute(pool) @@ -347,7 +350,8 @@ async fn fail_task(pool: &SqlitePool, task_id: &TaskId, error: &crate::Error) -> error!(err = msg, "failed task"); let error_json = serde_json::json!({ "error": msg }); sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data = json_patch(task_data, $1) WHERE id = $2", + "UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data \ + = json_patch(task_data, $1) WHERE id = $2", ) .bind(error_json) .bind(task_id) diff --git a/src/boot.rs b/src/boot.rs index e663d44b4..0f544c480 100644 --- a/src/boot.rs +++ b/src/boot.rs @@ -6,8 +6,7 @@ use std::path::PathBuf; use axum::Router; #[cfg(feature = "with-db")] use sea_orm_migration::MigratorTrait; -use tokio::task::JoinHandle; -use tokio::{select, signal}; +use tokio::{select, signal, task::JoinHandle}; use tracing::{debug, error, info, warn}; #[cfg(feature = "with-db")] diff --git a/src/mailer/email_sender.rs b/src/mailer/email_sender.rs index 61fe0474b..e392365b5 100644 --- a/src/mailer/email_sender.rs +++ b/src/mailer/email_sender.rs @@ -93,7 +93,8 @@ impl EmailSender { /// /// # Errors /// - /// When email doesn't send successfully or has an error to build the message + /// When email doesn't send successfully or has an error to build the + /// message pub async fn mail(&self, email: &Email) -> Result<()> { let content = MultiPart::alternative_plain_html(email.text.clone(), email.html.clone()); let mut builder = Message::builder() diff --git a/src/storage/drivers/aws.rs b/src/storage/drivers/aws.rs index 5cc2be526..5668e3a4c 100644 --- a/src/storage/drivers/aws.rs +++ b/src/storage/drivers/aws.rs @@ -1,16 +1,7 @@ -#[cfg(test)] -use core::time::Duration; -use std::sync::Arc; - -use object_store::{ - aws::{AmazonS3Builder, AwsCredential}, - StaticCredentialProvider, -}; -#[cfg(test)] -use object_store::{BackoffConfig, RetryConfig}; +use opendal::{services::S3, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::{opendal_adapter::OpendalAdapter, StoreDriver}; +use crate::storage::StorageResult; /// A set of AWS security credentials #[derive(Debug)] @@ -34,14 +25,10 @@ pub struct Credential { /// # Errors /// /// When could not initialize the client instance -pub fn new(bucket_name: &str, region: &str) -> Result> { - let s3 = AmazonS3Builder::new() - .with_bucket_name(bucket_name) - .with_region(region) - .build() - .map_err(Box::from)?; +pub fn new(bucket_name: &str, region: &str) -> StorageResult> { + let s3 = S3::default().bucket(bucket_name).region(region); - Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3)))) + Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish()))) } /// Create new AWS s3 storage with bucket, region and credentials. @@ -64,18 +51,16 @@ pub fn with_credentials( bucket_name: &str, region: &str, credentials: Credential, -) -> Result> { - let s3 = AmazonS3Builder::new() - .with_bucket_name(bucket_name) - .with_region(region) - .with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential { - key_id: credentials.key_id.to_string(), - secret_key: credentials.secret_key.to_string(), - token: credentials.token, - }))) - .build() - .map_err(Box::from)?; - Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3)))) +) -> StorageResult> { + let mut s3 = S3::default() + .bucket(bucket_name) + .region(region) + .access_key_id(&credentials.key_id) + .secret_access_key(&credentials.secret_key); + if let Some(token) = credentials.token { + s3 = s3.session_token(&token); + } + Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish()))) } /// Build store with failure @@ -86,15 +71,7 @@ pub fn with_credentials( #[cfg(test)] #[must_use] pub fn with_failure() -> Box { - let s3 = AmazonS3Builder::new() - .with_bucket_name("loco-test") - .with_retry(RetryConfig { - backoff: BackoffConfig::default(), - max_retries: 0, - retry_timeout: Duration::from_secs(0), - }) - .build() - .unwrap(); + let s3 = S3::default().bucket("loco-test"); - Box::new(ObjectStoreAdapter::new(Box::new(s3))) + Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish())) } diff --git a/src/storage/drivers/azure.rs b/src/storage/drivers/azure.rs index d2831c6c9..52d75e85e 100644 --- a/src/storage/drivers/azure.rs +++ b/src/storage/drivers/azure.rs @@ -1,7 +1,7 @@ -use object_store::azure::MicrosoftAzureBuilder; +use opendal::{services::Azblob, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::StoreDriver; +use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Create new Azure storage. /// @@ -18,13 +18,13 @@ pub fn new( container_name: &str, account_name: &str, access_key: &str, -) -> Result> { - let azure = MicrosoftAzureBuilder::new() - .with_container_name(container_name) - .with_account(account_name) - .with_access_key(access_key) - .build() - .map_err(Box::from)?; +) -> StorageResult> { + let azure = Azblob::default() + .container(container_name) + .account_name(account_name) + .account_key(access_key); - Ok(Box::new(ObjectStoreAdapter::new(Box::new(azure)))) + Ok(Box::new(OpendalAdapter::new( + Operator::new(azure)?.finish(), + ))) } diff --git a/src/storage/drivers/gcp.rs b/src/storage/drivers/gcp.rs index 7c7426342..59f74a117 100644 --- a/src/storage/drivers/gcp.rs +++ b/src/storage/drivers/gcp.rs @@ -1,30 +1,23 @@ -use object_store::gcp::GoogleCloudStorageBuilder; +use opendal::{services::Gcs, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::StoreDriver; +use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Create new GCP storage. /// /// # Examples ///``` /// use loco_rs::storage::drivers::gcp; -/// let gcp_driver = gcp::new("key", "account_key", "service_account"); +/// let gcp_driver = gcp::new("key", "credential_path"); /// ``` /// /// # Errors /// /// When could not initialize the client instance -pub fn new( - bucket_name: &str, - service_account_key: &str, - service_account: &str, -) -> Result> { - let gcs = GoogleCloudStorageBuilder::new() - .with_bucket_name(bucket_name) - .with_service_account_key(service_account_key) - .with_service_account_path(service_account) - .build() - .map_err(Box::from)?; +pub fn new(bucket_name: &str, credential_path: &str) -> StorageResult> { + let gcs = Gcs::default() + .bucket(bucket_name) + .credential_path(credential_path); - Ok(Box::new(ObjectStoreAdapter::new(Box::new(gcs)))) + Ok(Box::new(OpendalAdapter::new(Operator::new(gcs)?.finish()))) } diff --git a/src/storage/drivers/local.rs b/src/storage/drivers/local.rs index f46f3a1fa..b332a288e 100644 --- a/src/storage/drivers/local.rs +++ b/src/storage/drivers/local.rs @@ -1,7 +1,7 @@ -use object_store::local::LocalFileSystem; +use opendal::{services::Fs, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; -use crate::Result; +use super::StoreDriver; +use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Create new filesystem storage with no prefix /// @@ -12,7 +12,12 @@ use crate::Result; /// ``` #[must_use] pub fn new() -> Box { - Box::new(ObjectStoreAdapter::new(Box::new(LocalFileSystem::new()))) + let fs = Fs::default(); + Box::new(OpendalAdapter::new( + Operator::new(fs) + .expect("fs service should build with success") + .finish(), + )) } /// Create new filesystem storage with `prefix` applied to all paths @@ -26,8 +31,7 @@ pub fn new() -> Box { /// # Errors /// /// Returns an error if the path does not exist -pub fn new_with_prefix(prefix: impl AsRef) -> Result> { - Ok(Box::new(ObjectStoreAdapter::new(Box::new( - LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?, - )))) +pub fn new_with_prefix(prefix: impl AsRef) -> StorageResult> { + let fs = Fs::default().root(&prefix.as_ref().display().to_string()); + Ok(Box::new(OpendalAdapter::new(Operator::new(fs)?.finish()))) } diff --git a/src/storage/drivers/mem.rs b/src/storage/drivers/mem.rs index 665aca4af..1ba95a6a1 100644 --- a/src/storage/drivers/mem.rs +++ b/src/storage/drivers/mem.rs @@ -1,6 +1,7 @@ -use object_store::memory::InMemory; +use opendal::{services::Memory, Operator}; -use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; +use super::StoreDriver; +use crate::storage::drivers::opendal_adapter::OpendalAdapter; /// Create new in-memory storage. /// @@ -11,5 +12,9 @@ use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver}; /// ``` #[must_use] pub fn new() -> Box { - Box::new(ObjectStoreAdapter::new(Box::new(InMemory::new()))) + Box::new(OpendalAdapter::new( + Operator::new(Memory::default()) + .expect("memory service must build with success") + .finish(), + )) } diff --git a/src/storage/drivers/mod.rs b/src/storage/drivers/mod.rs index 7d2ecfe3f..b956c7d74 100644 --- a/src/storage/drivers/mod.rs +++ b/src/storage/drivers/mod.rs @@ -2,6 +2,8 @@ use std::path::Path; use async_trait::async_trait; use bytes::Bytes; +use opendal::Reader; + #[cfg(feature = "storage_aws_s3")] pub mod aws; #[cfg(feature = "storage_azure")] @@ -11,7 +13,7 @@ pub mod gcp; pub mod local; pub mod mem; pub mod null; -pub mod object_store_adapter; +pub mod opendal_adapter; use super::StorageResult; @@ -21,9 +23,24 @@ pub struct UploadResponse { pub version: Option, } -// TODO: need to properly abstract the object_store type in order to not -// strongly depend on it -pub type GetResponse = object_store::GetResult; +/// TODO: Add more methods to `GetResponse` to read the content in different +/// ways +/// +/// For example, we can read a specific range of bytes from the stream. +pub struct GetResponse { + stream: Reader, +} + +impl GetResponse { + pub(crate) fn new(stream: Reader) -> Self { + Self { stream } + } + + /// Read all content from the stream and return as `Bytes`. + pub async fn bytes(&self) -> StorageResult { + Ok(self.stream.read(..).await?.to_bytes()) + } +} #[async_trait] pub trait StoreDriver: Sync + Send { diff --git a/src/storage/drivers/object_store_adapter.rs b/src/storage/drivers/opendal_adapter.rs similarity index 59% rename from src/storage/drivers/object_store_adapter.rs rename to src/storage/drivers/opendal_adapter.rs index 1ad9f0a49..e3cd9c977 100644 --- a/src/storage/drivers/object_store_adapter.rs +++ b/src/storage/drivers/opendal_adapter.rs @@ -2,25 +2,28 @@ use std::path::Path; use async_trait::async_trait; use bytes::Bytes; -use object_store::ObjectStore; +use opendal::{layers::RetryLayer, Operator}; use super::{GetResponse, StoreDriver, UploadResponse}; use crate::storage::StorageResult; -pub struct ObjectStoreAdapter { - object_store_impl: Box, +pub struct OpendalAdapter { + opendal_impl: Operator, } -impl ObjectStoreAdapter { +impl OpendalAdapter { /// Constructor for creating a new `Store` instance. #[must_use] - pub fn new(object_store_impl: Box) -> Self { - Self { object_store_impl } + pub fn new(opendal_impl: Operator) -> Self { + let opendal_impl = opendal_impl + // Add retry layer with default settings + .layer(RetryLayer::default().with_jitter()); + Self { opendal_impl } } } #[async_trait] -impl StoreDriver for ObjectStoreAdapter { +impl StoreDriver for OpendalAdapter { /// Uploads the content represented by `Bytes` to the specified path in the /// object store. /// @@ -28,14 +31,14 @@ impl StoreDriver for ObjectStoreAdapter { /// /// Returns a `StorageResult` with the result of the upload operation. async fn upload(&self, path: &Path, content: &Bytes) -> StorageResult { - let path = object_store::path::Path::from(path.display().to_string()); - let res = self - .object_store_impl - .put(&path, content.clone().into()) + let _ = self + .opendal_impl + .write(&path.display().to_string(), content.clone()) .await?; + // TODO: opendal will return the e_tag and version in the future Ok(UploadResponse { - e_tag: res.e_tag, - version: res.version, + e_tag: None, + version: None, }) } @@ -45,8 +48,11 @@ impl StoreDriver for ObjectStoreAdapter { /// /// Returns a `StorageResult` with the result of the retrieval operation. async fn get(&self, path: &Path) -> StorageResult { - let path = object_store::path::Path::from(path.display().to_string()); - Ok(self.object_store_impl.get(&path).await?) + let r = self + .opendal_impl + .reader(&path.display().to_string()) + .await?; + Ok(GetResponse::new(r)) } /// Deletes the content at the specified path in the object store. @@ -56,8 +62,10 @@ impl StoreDriver for ObjectStoreAdapter { /// Returns a `StorageResult` indicating the success of the deletion /// operation. async fn delete(&self, path: &Path) -> StorageResult<()> { - let path = object_store::path::Path::from(path.display().to_string()); - Ok(self.object_store_impl.delete(&path).await?) + Ok(self + .opendal_impl + .delete(&path.display().to_string()) + .await?) } /// Renames or moves the content from one path to another in the object @@ -68,9 +76,9 @@ impl StoreDriver for ObjectStoreAdapter { /// Returns a `StorageResult` indicating the success of the rename/move /// operation. async fn rename(&self, from: &Path, to: &Path) -> StorageResult<()> { - let from = object_store::path::Path::from(from.display().to_string()); - let to = object_store::path::Path::from(to.display().to_string()); - Ok(self.object_store_impl.rename(&from, &to).await?) + let from = from.display().to_string(); + let to = to.display().to_string(); + Ok(self.opendal_impl.rename(&from, &to).await?) } /// Copies the content from one path to another in the object store. @@ -79,9 +87,9 @@ impl StoreDriver for ObjectStoreAdapter { /// /// Returns a `StorageResult` indicating the success of the copy operation. async fn copy(&self, from: &Path, to: &Path) -> StorageResult<()> { - let from = object_store::path::Path::from(from.display().to_string()); - let to = object_store::path::Path::from(to.display().to_string()); - Ok(self.object_store_impl.copy(&from, &to).await?) + let from = from.display().to_string(); + let to = to.display().to_string(); + Ok(self.opendal_impl.copy(&from, &to).await?) } /// Checks if the content exists at the specified path in the object store. @@ -91,7 +99,7 @@ impl StoreDriver for ObjectStoreAdapter { /// Returns a `StorageResult` with a boolean indicating the existence of the /// content. async fn exists(&self, path: &Path) -> StorageResult { - let path = object_store::path::Path::from(path.display().to_string()); - Ok(self.object_store_impl.get(&path).await.is_ok()) + let path = path.display().to_string(); + Ok(self.opendal_impl.exists(&path).await?) } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index dc8db80d8..1bd2f8d02 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -29,7 +29,7 @@ pub enum StorageError { StoreNotFound(String), #[error(transparent)] - Store(#[from] object_store::Error), + Store(#[from] opendal::Error), #[error("Unable to read data from file {}", path.display().to_string())] UnableToReadBytes { path: PathBuf }, diff --git a/src/storage/strategies/backup.rs b/src/storage/strategies/backup.rs index d53e84c8a..5e5c4f0f1 100644 --- a/src/storage/strategies/backup.rs +++ b/src/storage/strategies/backup.rs @@ -90,12 +90,7 @@ impl StorageStrategy for BackupStrategy { /// Downloads content only from primary storage backend. async fn download(&self, storage: &Storage, path: &Path) -> StorageResult { let store = storage.as_store_err(&self.primary)?; - Ok(store - .get(path) - .await? - .bytes() - .await - .map_err(StorageError::Store)?) + Ok(store.get(path).await?.bytes().await?) } /// Deletes content from the primary and, if configured, secondary storage diff --git a/src/storage/strategies/mirror.rs b/src/storage/strategies/mirror.rs index ff30286ba..e26bfa028 100644 --- a/src/storage/strategies/mirror.rs +++ b/src/storage/strategies/mirror.rs @@ -231,12 +231,7 @@ impl MirrorStrategy { path: &Path, ) -> StorageResult { let store = storage.as_store_err(store_name)?; - store - .get(path) - .await? - .bytes() - .await - .map_err(StorageError::Store) + store.get(path).await?.bytes().await } } From 96b20a74ab30e1e59202d217a7f8e4f1e121d9bf Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 12 Nov 2024 00:43:29 +0800 Subject: [PATCH 02/10] Fix clippy Signed-off-by: Xuanwo --- src/storage/drivers/local.rs | 4 ++++ src/storage/drivers/mem.rs | 4 ++++ src/storage/drivers/mod.rs | 4 ++++ src/storage/drivers/opendal_adapter.rs | 3 +-- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/storage/drivers/local.rs b/src/storage/drivers/local.rs index b332a288e..31f0788d6 100644 --- a/src/storage/drivers/local.rs +++ b/src/storage/drivers/local.rs @@ -10,6 +10,10 @@ use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// use loco_rs::storage::drivers::local; /// let file_system_driver = local::new(); /// ``` +/// +/// # Panics +/// +/// Panics if the filesystem service built failed. #[must_use] pub fn new() -> Box { let fs = Fs::default(); diff --git a/src/storage/drivers/mem.rs b/src/storage/drivers/mem.rs index 1ba95a6a1..0d0f31410 100644 --- a/src/storage/drivers/mem.rs +++ b/src/storage/drivers/mem.rs @@ -10,6 +10,10 @@ use crate::storage::drivers::opendal_adapter::OpendalAdapter; /// use loco_rs::storage::drivers::mem; /// let mem_storage = mem::new(); /// ``` +/// +/// # Panics +/// +/// Panics if the memory service built failed. #[must_use] pub fn new() -> Box { Box::new(OpendalAdapter::new( diff --git a/src/storage/drivers/mod.rs b/src/storage/drivers/mod.rs index b956c7d74..e1aa0ccb2 100644 --- a/src/storage/drivers/mod.rs +++ b/src/storage/drivers/mod.rs @@ -37,6 +37,10 @@ impl GetResponse { } /// Read all content from the stream and return as `Bytes`. + /// + /// # Errors + /// + /// Returns a `StorageError` with the reason for the failure. pub async fn bytes(&self) -> StorageResult { Ok(self.stream.read(..).await?.to_bytes()) } diff --git a/src/storage/drivers/opendal_adapter.rs b/src/storage/drivers/opendal_adapter.rs index e3cd9c977..fe0062e2b 100644 --- a/src/storage/drivers/opendal_adapter.rs +++ b/src/storage/drivers/opendal_adapter.rs @@ -31,8 +31,7 @@ impl StoreDriver for OpendalAdapter { /// /// Returns a `StorageResult` with the result of the upload operation. async fn upload(&self, path: &Path, content: &Bytes) -> StorageResult { - let _ = self - .opendal_impl + self.opendal_impl .write(&path.display().to_string(), content.clone()) .await?; // TODO: opendal will return the e_tag and version in the future From 02bcf38b5f693f3a80d99d76885774b4b68e1be4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 21 Nov 2024 15:16:14 +0800 Subject: [PATCH 03/10] Fix aws test Signed-off-by: Xuanwo --- src/storage/drivers/aws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/drivers/aws.rs b/src/storage/drivers/aws.rs index 5668e3a4c..0938f986d 100644 --- a/src/storage/drivers/aws.rs +++ b/src/storage/drivers/aws.rs @@ -71,7 +71,7 @@ pub fn with_credentials( #[cfg(test)] #[must_use] pub fn with_failure() -> Box { - let s3 = S3::default().bucket("loco-test"); + let s3 = S3::default().bucket("loco-test").region("ap-south-1"); Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish())) } From bc9d81ec62d081844a5884b9fbc2febf97271d81 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 21 Nov 2024 15:28:51 +0800 Subject: [PATCH 04/10] Fix fs test Signed-off-by: Xuanwo --- src/storage/drivers/local.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/drivers/local.rs b/src/storage/drivers/local.rs index 31f0788d6..06d1a8c12 100644 --- a/src/storage/drivers/local.rs +++ b/src/storage/drivers/local.rs @@ -16,7 +16,7 @@ use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult}; /// Panics if the filesystem service built failed. #[must_use] pub fn new() -> Box { - let fs = Fs::default(); + let fs = Fs::default().root("/"); Box::new(OpendalAdapter::new( Operator::new(fs) .expect("fs service should build with success") From ee0776cd0384a766e04ec44d02a6f52eab5209fc Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 21 Nov 2024 15:47:33 +0800 Subject: [PATCH 05/10] Add fallback for services doesn't have copy and rename support Signed-off-by: Xuanwo --- src/storage/drivers/opendal_adapter.rs | 44 +++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/src/storage/drivers/opendal_adapter.rs b/src/storage/drivers/opendal_adapter.rs index fe0062e2b..7a753b6b8 100644 --- a/src/storage/drivers/opendal_adapter.rs +++ b/src/storage/drivers/opendal_adapter.rs @@ -2,10 +2,11 @@ use std::path::Path; use async_trait::async_trait; use bytes::Bytes; +use futures_util::SinkExt; use opendal::{layers::RetryLayer, Operator}; use super::{GetResponse, StoreDriver, UploadResponse}; -use crate::storage::StorageResult; +use crate::storage::{StorageError, StorageResult}; pub struct OpendalAdapter { opendal_impl: Operator, @@ -70,25 +71,58 @@ impl StoreDriver for OpendalAdapter { /// Renames or moves the content from one path to another in the object /// store. /// + /// # Behavior + /// + /// Fallback to copy and delete source if the storage does not support rename. + /// /// # Errors /// /// Returns a `StorageResult` indicating the success of the rename/move /// operation. async fn rename(&self, from: &Path, to: &Path) -> StorageResult<()> { - let from = from.display().to_string(); - let to = to.display().to_string(); - Ok(self.opendal_impl.rename(&from, &to).await?) + if self.opendal_impl.info().full_capability().rename { + let from = from.display().to_string(); + let to = to.display().to_string(); + Ok(self.opendal_impl.rename(&from, &to).await?) + } else { + self.copy(&from, &to).await?; + self.delete(from).await?; + Ok(()) + } } /// Copies the content from one path to another in the object store. /// + /// # Behavior + /// + /// Fallback to read from source and write into dest if the storage does not support copy. + /// /// # Errors /// /// Returns a `StorageResult` indicating the success of the copy operation. async fn copy(&self, from: &Path, to: &Path) -> StorageResult<()> { let from = from.display().to_string(); let to = to.display().to_string(); - Ok(self.opendal_impl.copy(&from, &to).await?) + if self.opendal_impl.info().full_capability().copy { + Ok(self.opendal_impl.copy(&from, &to).await?) + } else { + let mut reader = self + .opendal_impl + .reader(&from) + .await? + .into_bytes_stream(..) + .await?; + let mut writer = self.opendal_impl.writer(&to).await?.into_bytes_sink(); + writer + .send_all(&mut reader) + .await + .map_err(|err| StorageError::Any(Box::new(err)))?; + writer + .close() + .await + .map_err(|err| StorageError::Any(Box::new(err)))?; + Ok(()) + } } /// Checks if the content exists at the specified path in the object store. From a68d2d0cea0da5a974bb77a00cb87bdd30f14774 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 27 Nov 2024 17:06:28 +0800 Subject: [PATCH 06/10] Fix style Signed-off-by: Xuanwo --- src/storage/drivers/opendal_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/drivers/opendal_adapter.rs b/src/storage/drivers/opendal_adapter.rs index 7a753b6b8..23ee1ee7b 100644 --- a/src/storage/drivers/opendal_adapter.rs +++ b/src/storage/drivers/opendal_adapter.rs @@ -85,7 +85,7 @@ impl StoreDriver for OpendalAdapter { let to = to.display().to_string(); Ok(self.opendal_impl.rename(&from, &to).await?) } else { - self.copy(&from, &to).await?; + self.copy(from, to).await?; self.delete(from).await?; Ok(()) } From 3cf1ce4eb415f4cdf2bff79acb99608797fa4e6e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 27 Nov 2024 22:56:25 +0800 Subject: [PATCH 07/10] Don't load imds in CI Signed-off-by: Xuanwo --- src/storage/drivers/aws.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/storage/drivers/aws.rs b/src/storage/drivers/aws.rs index 0938f986d..6d0752fae 100644 --- a/src/storage/drivers/aws.rs +++ b/src/storage/drivers/aws.rs @@ -71,7 +71,10 @@ pub fn with_credentials( #[cfg(test)] #[must_use] pub fn with_failure() -> Box { - let s3 = S3::default().bucket("loco-test").region("ap-south-1"); + let s3 = S3::default() + .bucket("loco-test") + .region("ap-south-1") + .disable_ec2_metadata(); Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish())) } From 9df0f1557f2d3596652d1567247c20f09229b231 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 5 Dec 2024 01:44:52 +0800 Subject: [PATCH 08/10] Add allow anonymouse Signed-off-by: Xuanwo --- src/storage/drivers/aws.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/drivers/aws.rs b/src/storage/drivers/aws.rs index 6d0752fae..142a10d51 100644 --- a/src/storage/drivers/aws.rs +++ b/src/storage/drivers/aws.rs @@ -74,6 +74,7 @@ pub fn with_failure() -> Box { let s3 = S3::default() .bucket("loco-test") .region("ap-south-1") + .allow_anonymous() .disable_ec2_metadata(); Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish())) From cdc4763ee429718c9d461f6ab0d10a81d49c9a8d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 13 Dec 2024 16:44:16 +0800 Subject: [PATCH 09/10] Fix exists handling Signed-off-by: Xuanwo --- src/storage/drivers/opendal_adapter.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/storage/drivers/opendal_adapter.rs b/src/storage/drivers/opendal_adapter.rs index 23ee1ee7b..4fb1ca085 100644 --- a/src/storage/drivers/opendal_adapter.rs +++ b/src/storage/drivers/opendal_adapter.rs @@ -131,8 +131,14 @@ impl StoreDriver for OpendalAdapter { /// /// Returns a `StorageResult` with a boolean indicating the existence of the /// content. + /// + /// # TODO + /// + /// The `exists` function should return an error for issues such as permission denied. + /// However, these errors are not handled during the migration process and should be addressed + /// after the test suites are refactored. async fn exists(&self, path: &Path) -> StorageResult { let path = path.display().to_string(); - Ok(self.opendal_impl.exists(&path).await?) + Ok(self.opendal_impl.exists(&path).await.is_ok()) } } From 8c71fb13f4f3fba385151560d518c61e29fd49f1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 17 Dec 2024 00:45:27 +0800 Subject: [PATCH 10/10] Fix wrong behavior Signed-off-by: Xuanwo --- src/storage/drivers/opendal_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/drivers/opendal_adapter.rs b/src/storage/drivers/opendal_adapter.rs index 4fb1ca085..4153c3c12 100644 --- a/src/storage/drivers/opendal_adapter.rs +++ b/src/storage/drivers/opendal_adapter.rs @@ -139,6 +139,6 @@ impl StoreDriver for OpendalAdapter { /// after the test suites are refactored. async fn exists(&self, path: &Path) -> StorageResult { let path = path.display().to_string(); - Ok(self.opendal_impl.exists(&path).await.is_ok()) + Ok(self.opendal_impl.exists(&path).await.unwrap_or(false)) } }