Skip to content

Commit

Permalink
fix: pass correct object store config to monitored object store for a…
Browse files Browse the repository at this point in the history
…ll backends (#15260)
  • Loading branch information
hzxa21 authored Feb 26, 2024
1 parent 1dd2c3d commit 2cfe136
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 80 deletions.
101 changes: 37 additions & 64 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,20 @@ pub trait ObjectStore: Send + Sync {
/// specified in the request is not found, it will be considered as successfully deleted.
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>;

fn monitored(self, metrics: Arc<ObjectStoreMetrics>) -> MonitoredObjectStore<Self>
fn monitored(
self,
metrics: Arc<ObjectStoreMetrics>,
config: ObjectStoreConfig,
) -> MonitoredObjectStore<Self>
where
Self: Sized,
{
MonitoredObjectStore::new(self, metrics)
MonitoredObjectStore::new(self, metrics, config)
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

fn recv_buffer_size(&self) -> usize {
// 2MB
1 << 21
}

fn config(&self) -> Option<&ObjectStoreConfig> {
// TODO: remove option
None
}
}

pub enum ObjectStoreImpl {
Expand Down Expand Up @@ -280,16 +274,6 @@ impl ObjectStoreImpl {
ObjectStoreImpl::Sim(_) => true,
}
}

pub fn recv_buffer_size(&self) -> usize {
match self {
ObjectStoreImpl::InMem(store) => store.recv_buffer_size(),
ObjectStoreImpl::Opendal(store) => store.recv_buffer_size(),
ObjectStoreImpl::S3(store) => store.recv_buffer_size(),
#[cfg(madsim)]
ObjectStoreImpl::Sim(store) => store.recv_buffer_size(),
}
}
}

fn try_update_failure_metric<T>(
Expand Down Expand Up @@ -526,29 +510,22 @@ pub struct MonitoredObjectStore<OS: ObjectStore> {
/// - start `operation_latency` timer
/// - `failure-count`
impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub fn new(store: OS, object_store_metrics: Arc<ObjectStoreMetrics>) -> Self {
if let Some(config) = store.config() {
Self {
object_store_metrics,
streaming_read_timeout: Some(Duration::from_millis(
config.object_store_streaming_read_timeout_ms,
)),
streaming_upload_timeout: Some(Duration::from_millis(
config.object_store_streaming_upload_timeout_ms,
)),
read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)),
upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)),
inner: store,
}
} else {
Self {
inner: store,
object_store_metrics,
streaming_read_timeout: None,
streaming_upload_timeout: None,
read_timeout: None,
upload_timeout: None,
}
pub fn new(
store: OS,
object_store_metrics: Arc<ObjectStoreMetrics>,
config: ObjectStoreConfig,
) -> Self {
Self {
object_store_metrics,
streaming_read_timeout: Some(Duration::from_millis(
config.object_store_streaming_read_timeout_ms,
)),
streaming_upload_timeout: Some(Duration::from_millis(
config.object_store_streaming_upload_timeout_ms,
)),
read_timeout: Some(Duration::from_millis(config.object_store_read_timeout_ms)),
upload_timeout: Some(Duration::from_millis(config.object_store_upload_timeout_ms)),
inner: store,
}
}

Expand Down Expand Up @@ -800,10 +777,6 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
try_update_failure_metric(&self.object_store_metrics, &res, operation_type);
res
}

fn recv_buffer_size(&self) -> usize {
self.inner.recv_buffer_size()
}
}

/// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment
Expand All @@ -823,19 +796,19 @@ pub async fn build_remote_object_store(
let bucket = s3.strip_prefix("s3://").unwrap();

ObjectStoreImpl::Opendal(
OpendalObjectStore::new_s3_engine(bucket.to_string(), config)
OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::new_with_config(
s3.strip_prefix("s3://").unwrap().to_string(),
metrics.clone(),
config,
config.clone(),
)
.await
.monitored(metrics),
.monitored(metrics, config),
)
}
}
Expand All @@ -855,7 +828,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
obs if obs.starts_with("obs://") => {
Expand All @@ -864,7 +837,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}

Expand All @@ -874,7 +847,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
webhdfs if webhdfs.starts_with("webhdfs://") => {
Expand All @@ -883,7 +856,7 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_webhdfs_engine(namenode.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
azblob if azblob.starts_with("azblob://") => {
Expand All @@ -892,15 +865,15 @@ pub async fn build_remote_object_store(
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}
fs if fs.starts_with("fs://") => {
let fs = fs.strip_prefix("fs://").unwrap();
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_fs_engine(fs.to_string())
.unwrap()
.monitored(metrics),
.monitored(metrics, config),
)
}

Expand All @@ -911,29 +884,29 @@ pub async fn build_remote_object_store(
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config)
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics),
.monitored(metrics, config),
),
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
} else {
tracing::warn!("You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident);
}
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics))
ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config))
}
"memory-shared" => {
if ident == "Meta Backup" {
tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in production environment.", ident);
} else {
tracing::warn!("You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.", ident);
}
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics))
ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config))
}
#[cfg(madsim)]
sim if sim.starts_with("sim://") => {
ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics))
ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config))
}
other => {
unimplemented!(
Expand Down
11 changes: 0 additions & 11 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,17 +532,6 @@ impl ObjectStore for S3ObjectStore {
fn store_media_type(&self) -> &'static str {
"s3"
}

fn recv_buffer_size(&self) -> usize {
self.config
.s3
.object_store_recv_buffer_size
.unwrap_or(1 << 21)
}

fn config(&self) -> Option<&ObjectStoreConfig> {
Some(&self.config)
}
}

impl S3ObjectStore {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/backup/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{
InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
Expand Down Expand Up @@ -191,6 +192,7 @@ pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
InMemObjectStore::new(),
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
))),
)
.await
Expand Down
7 changes: 5 additions & 2 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand Down Expand Up @@ -49,7 +49,10 @@ use risingwave_storage::monitor::{
};

pub fn mock_sstable_store() -> SstableStoreRef {
let store = InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused()));
let store = InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
);
let store = Arc::new(ObjectStoreImpl::InMem(store));
let path = "test".to_string();
Arc::new(SstableStore::new(SstableStoreConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn bench_builder(
ObjectStoreConfig::default(),
)
.await
.monitored(metrics)
.monitored(metrics, ObjectStoreConfig::default())
});
let object_store = Arc::new(ObjectStoreImpl::S3(object_store));
let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
Expand Down
7 changes: 5 additions & 2 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_object_store::object::{
Expand Down Expand Up @@ -54,7 +54,10 @@ pub const TEST_KEYS_COUNT: usize = 10;

pub fn mock_sstable_store() -> SstableStoreRef {
mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
InMemObjectStore::new().monitored(Arc::new(ObjectStoreMetrics::unused())),
InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
ObjectStoreConfig::default(),
),
)))
}

Expand Down

0 comments on commit 2cfe136

Please sign in to comment.