diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 92db04cc722b5..9948f1c31b3a6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1030,6 +1030,9 @@ pub struct S3ObjectStoreDeveloperConfig { default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes" )] pub object_store_retryable_service_error_codes: Vec, + + #[serde(default = "default::object_store_config::s3::developer::use_opendal")] + pub use_opendal: bool, } impl SystemConfig { @@ -1718,6 +1721,9 @@ pub mod default { } pub mod developer { + use crate::util::env_var::env_var_is_false_or; + const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; + pub fn object_store_retry_unknown_service_error() -> bool { false } @@ -1725,6 +1731,14 @@ pub mod default { pub fn object_store_retryable_service_error_codes() -> Vec { vec!["SlowDown".into(), "TooManyRequests".into()] } + + pub fn use_opendal() -> bool { + // TODO: deprecate this config when we are completely switch from aws sdk to opendal. + // The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is + // 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set. + // 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set. + !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) + } } } } diff --git a/src/config/example.toml b/src/config/example.toml index 25c2f4b200f8a..b6605b5305cc9 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -203,6 +203,7 @@ identity_resolution_timeout_s = 5 [storage.object_store.s3.developer] object_store_retry_unknown_service_error = false object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"] +use_opendal = true [system] barrier_interval_ms = 1000 diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 89622e66895c9..c75447323c30e 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -32,7 +32,6 @@ use await_tree::InstrumentAwait; use futures::stream::BoxStream; use futures::StreamExt; pub use risingwave_common::config::ObjectStoreConfig; -use risingwave_common::util::env_var::env_var_is_false_or; pub use s3::*; pub mod error; @@ -50,8 +49,6 @@ use self::sim::SimObjectStore; pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; -const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; - type BoxedStreamingUploader = Box; pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fmt::Debug + 'static; @@ -797,8 +794,15 @@ pub async fn build_remote_object_store( ) -> ObjectStoreImpl { match url { s3 if s3.starts_with("s3://") => { - // only switch to s3 sdk when `RW_USE_OPENDAL_FOR_S3` is set to false. - if env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) { + if config.s3.developer.use_opendal { + let bucket = s3.strip_prefix("s3://").unwrap(); + tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket); + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) + .unwrap() + .monitored(metrics, config), + ) + } else { ObjectStoreImpl::S3( S3ObjectStore::new_with_config( s3.strip_prefix("s3://").unwrap().to_string(), @@ -808,14 +812,6 @@ pub async fn build_remote_object_store( .await .monitored(metrics, config), ) - } else { - let bucket = s3.strip_prefix("s3://").unwrap(); - tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) - .unwrap() - .monitored(metrics, config), - ) } } #[cfg(feature = "hdfs-backend")]