From ae0fe6a8650e043fc2a63746eb107dd00f6ce47c Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Mon, 20 Nov 2023 13:48:27 +0000 Subject: [PATCH 1/4] Implement O_DIRECT for open to bypass metadata cache if enabled Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/fs.rs | 12 ++- .../tests/fuse_tests/consistency_test.rs | 95 +++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 0feb54a65..0d5b00425 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -8,6 +8,7 @@ use std::time::{Duration, UNIX_EPOCH}; use time::OffsetDateTime; use tracing::{debug, error, trace}; +use fuser::consts::FOPEN_DIRECT_IO; use fuser::{FileAttr, KernelConfig}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; @@ -562,7 +563,12 @@ where pub async fn open(&self, ino: InodeNo, flags: i32, pid: u32) -> Result { trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid); - let force_revalidate = !self.config.cache_config.serve_lookup_from_cache; + #[cfg(not(target_os = "linux"))] + let direct_io = false; + #[cfg(target_os = "linux")] + let direct_io = flags & libc::O_DIRECT != 0; + + let force_revalidate = !self.config.cache_config.serve_lookup_from_cache || direct_io; let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?; match lookup.inode.kind() { @@ -596,7 +602,9 @@ where }; self.file_handles.write().await.insert(fh, Arc::new(handle)); - Ok(Opened { fh, flags: 0 }) + let reply_flags = if direct_io { FOPEN_DIRECT_IO } else { 0 }; + + Ok(Opened { fh, flags: reply_flags }) } #[allow(clippy::too_many_arguments)] // We don't get to choose this interface diff --git a/mountpoint-s3/tests/fuse_tests/consistency_test.rs b/mountpoint-s3/tests/fuse_tests/consistency_test.rs index 393d6f433..ffc1b08ea 100644 --- a/mountpoint-s3/tests/fuse_tests/consistency_test.rs +++ b/mountpoint-s3/tests/fuse_tests/consistency_test.rs @@ -83,3 +83,98 @@ fn page_cache_sharing_test_mock_with_cache(prefix: &str) { prefix, ); } + +#[cfg(target_os = "linux")] +mod direct_io { + use super::*; + + use std::fs::OpenOptions; + use std::os::unix::fs::OpenOptionsExt; + use std::time::Duration; + + use test_case::test_case; + + use mountpoint_s3::fs::{CacheConfig, S3FilesystemConfig}; + + fn cache_and_direct_io_test(creator_fn: F, prefix: &str) + where + F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), + { + const OBJECT_SIZE: usize = 8; + + let test_session_conf = TestSessionConfig { + filesystem_config: S3FilesystemConfig { + cache_config: CacheConfig { + serve_lookup_from_cache: true, + dir_ttl: Duration::from_secs(600), + file_ttl: Duration::from_secs(600), + }, + ..Default::default() + }, + ..Default::default() + }; + let (mount_point, _session, mut test_client) = creator_fn(prefix, test_session_conf); + + let file_name = "file.bin"; + + // Create the first version of the file + let old_contents = vec![0x0fu8; OBJECT_SIZE]; + test_client.put_object(file_name, &old_contents).unwrap(); + + // Open and read fully the file before updating it remotely + let old_file = File::open(mount_point.path().join(file_name)).unwrap(); + let mut buf = vec![0u8; OBJECT_SIZE]; + old_file.read_exact_at(&mut buf, 0).unwrap(); + assert_eq!(buf, &old_contents[..buf.len()]); + + let new_contents = vec![0xffu8; OBJECT_SIZE]; + test_client.put_object(file_name, &new_contents).unwrap(); + + // Open the file again, which should be reading from cache + for _ in 0..2 { + let new_file = File::open(mount_point.path().join(file_name)).unwrap(); + new_file + .read_exact_at(&mut buf, 0) + .expect("should be OK as result is cached"); + assert_eq!( + buf, + &old_contents[..buf.len()], + "bytes read should be old object from cache" + ); + } + + // Open the file w/ O_DIRECT, which should see the new file on S3 despite the old file being cached + let mut buf = [0u8; OBJECT_SIZE]; + let new_file = OpenOptions::new() + .read(true) + .custom_flags(libc::O_DIRECT) + .open(mount_point.path().join(file_name)) + .unwrap(); + new_file + .read_exact_at(&mut buf, 0) + .expect("should be able to read file content from S3"); + assert_eq!( + buf, + &new_contents[..buf.len()], + "bytes read should be new bytes from S3 client" + ); + } + + #[test_case(""; "no prefix")] + #[test_case("cache_and_direct_io_test_mock"; "prefix")] + fn cache_and_direct_io_test_mock(prefix: &str) { + cache_and_direct_io_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + prefix, + ); + } + + #[cfg(feature = "s3_tests")] + #[test] + fn cache_and_direct_io_test_s3() { + cache_and_direct_io_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "cache_and_direct_io_test_s3", + ); + } +} From 159c92c81c7209e26988da2b61d582ebb7c2c7f0 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Tue, 21 Nov 2023 17:32:07 +0000 Subject: [PATCH 2/4] Re-organize integration test modules to be able to run O_DIRECT tests serially Signed-off-by: Alessandro Passaro --- Cargo.lock | 36 ++ mountpoint-s3/Cargo.toml | 1 + mountpoint-s3/tests/common/fuse.rs | 546 +++++++++++++++++ mountpoint-s3/tests/common/mod.rs | 5 + mountpoint-s3/tests/direct_io.rs | 104 ++++ .../tests/fuse_tests/consistency_test.rs | 106 +--- mountpoint-s3/tests/fuse_tests/fork_test.rs | 2 +- mountpoint-s3/tests/fuse_tests/lookup_test.rs | 18 +- mountpoint-s3/tests/fuse_tests/mkdir_test.rs | 6 +- mountpoint-s3/tests/fuse_tests/mod.rs | 547 ------------------ mountpoint-s3/tests/fuse_tests/perm_test.rs | 10 +- .../tests/fuse_tests/prefetch_test.rs | 23 +- mountpoint-s3/tests/fuse_tests/read_test.rs | 25 +- .../tests/fuse_tests/readdir_test.rs | 10 +- mountpoint-s3/tests/fuse_tests/rmdir_test.rs | 16 +- .../tests/fuse_tests/semantics_doc_test.rs | 14 +- .../tests/fuse_tests/setattr_test.rs | 6 +- mountpoint-s3/tests/fuse_tests/unlink_test.rs | 18 +- mountpoint-s3/tests/fuse_tests/write_test.rs | 48 +- 19 files changed, 789 insertions(+), 752 deletions(-) create mode 100644 mountpoint-s3/tests/common/fuse.rs create mode 100644 mountpoint-s3/tests/direct_io.rs diff --git a/Cargo.lock b/Cargo.lock index 9d031daa4..abb7e5feb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1954,6 +1954,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serial_test", "sha2", "shuttle", "supports-color", @@ -2193,6 +2194,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.8" @@ -2832,6 +2843,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" +dependencies = [ + "proc-macro2 1.0.63", + "quote 1.0.29", + "syn 2.0.28", +] + [[package]] name = "sha1" version = "0.10.5" diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 56c1649b2..08153aa69 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -60,6 +60,7 @@ proptest = "1.0.0" proptest-derive = "0.3.0" rand = "0.8.5" rand_chacha = "0.3.1" +serial_test = "2.0.0" sha2 = "0.10.6" shuttle = { version = "0.5.0" } tempfile = "3.4.0" diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs new file mode 100644 index 000000000..b39c09c56 --- /dev/null +++ b/mountpoint-s3/tests/common/fuse.rs @@ -0,0 +1,546 @@ +use std::ffi::OsStr; +use std::fs::ReadDir; +use std::path::Path; +use std::sync::Arc; + +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_sts::config::Region; +use fuser::{BackgroundSession, MountOption, Session}; +use futures::Future; +use mountpoint_s3::data_cache::DataCache; +use mountpoint_s3::fuse::S3FuseFilesystem; +use mountpoint_s3::prefetch::{Prefetch, PrefetcherConfig}; +use mountpoint_s3::prefix::Prefix; +use mountpoint_s3::S3FilesystemConfig; +use mountpoint_s3_client::types::PutObjectParams; +use mountpoint_s3_client::ObjectClient; +use rand::RngCore; +use rand_chacha::rand_core::OsRng; +use tempfile::TempDir; + +pub trait TestClient: Send { + fn put_object(&mut self, key: &str, value: &[u8]) -> Result<(), Box> { + self.put_object_params(key, value, PutObjectParams::default()) + } + + fn put_object_params( + &mut self, + key: &str, + value: &[u8], + params: PutObjectParams, + ) -> Result<(), Box>; + + fn remove_object(&mut self, key: &str) -> Result<(), Box>; + + fn contains_dir(&self, key: &str) -> Result>; + + fn contains_key(&self, key: &str) -> Result>; + + fn is_upload_in_progress(&self, key: &str) -> Result>; + + fn get_object_storage_class(&self, key: &str) -> Result, Box>; + + fn restore_object(&mut self, key: &str, expedited: bool) -> Result<(), Box>; + + fn is_object_restored(&mut self, key: &str) -> Result>; +} + +pub type TestClientBox = Box; + +pub struct TestSessionConfig { + pub part_size: usize, + pub filesystem_config: S3FilesystemConfig, + pub prefetcher_config: PrefetcherConfig, +} + +impl Default for TestSessionConfig { + fn default() -> Self { + Self { + part_size: 8 * 1024 * 1024, + filesystem_config: Default::default(), + prefetcher_config: Default::default(), + } + } +} + +fn create_fuse_session( + client: Client, + prefetcher: Prefetcher, + bucket: &str, + prefix: &str, + mount_dir: &Path, + filesystem_config: S3FilesystemConfig, +) -> BackgroundSession +where + Client: ObjectClient + Send + Sync + 'static, + Prefetcher: Prefetch + Send + Sync + 'static, +{ + let options = vec![ + MountOption::DefaultPermissions, + MountOption::FSName("mountpoint-s3".to_string()), + MountOption::NoAtime, + MountOption::AutoUnmount, + MountOption::AllowOther, + ]; + + let prefix = Prefix::new(prefix).expect("valid prefix"); + let session = Session::new( + S3FuseFilesystem::new(client, prefetcher, bucket, &prefix, filesystem_config), + mount_dir, + &options, + ) + .unwrap(); + + BackgroundSession::new(session).unwrap() +} + +pub mod mock_session { + use super::*; + + use futures::executor::ThreadPool; + use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch}; + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockObject}; + + /// Create a FUSE mount backed by a mock object client that does not talk to S3 + pub fn new(test_name: &str, test_config: TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) { + let mount_dir = tempfile::tempdir().unwrap(); + + let bucket = "test_bucket"; + let prefix = if test_name.is_empty() { + test_name.to_string() + } else { + format!("{test_name}/") + }; + + let client_config = MockClientConfig { + bucket: bucket.to_string(), + part_size: test_config.part_size, + }; + let client = Arc::new(MockClient::new(client_config)); + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); + let session = create_fuse_session( + client.clone(), + prefetcher, + bucket, + &prefix, + mount_dir.path(), + test_config.filesystem_config, + ); + let test_client = create_test_client(client, &prefix); + + (mount_dir, session, test_client) + } + + /// Create a FUSE mount backed by a mock object client, with caching, that does not talk to S3 + pub fn new_with_cache( + cache: Cache, + ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) + where + Cache: DataCache + Send + Sync + 'static, + { + |test_name, test_config| { + let mount_dir = tempfile::tempdir().unwrap(); + + let bucket = "test_bucket"; + let prefix = if test_name.is_empty() { + test_name.to_string() + } else { + format!("{test_name}/") + }; + + let client_config = MockClientConfig { + bucket: bucket.to_string(), + part_size: test_config.part_size, + }; + let client = Arc::new(MockClient::new(client_config)); + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); + let session = create_fuse_session( + client.clone(), + prefetcher, + bucket, + &prefix, + mount_dir.path(), + test_config.filesystem_config, + ); + let test_client = create_test_client(client, &prefix); + + (mount_dir, session, test_client) + } + } + + fn create_test_client(client: Arc, prefix: &str) -> TestClientBox { + let test_client = MockTestClient { + prefix: prefix.to_owned(), + client, + }; + + Box::new(test_client) + } + + struct MockTestClient { + prefix: String, + client: Arc, + } + + impl TestClient for MockTestClient { + fn put_object_params( + &mut self, + key: &str, + value: &[u8], + params: PutObjectParams, + ) -> Result<(), Box> { + let full_key = format!("{}{}", self.prefix, key); + let mut mock_object = MockObject::from(value); + mock_object.set_storage_class(params.storage_class); + self.client.add_object(&full_key, mock_object); + Ok(()) + } + + fn remove_object(&mut self, key: &str) -> Result<(), Box> { + let full_key = format!("{}{}", self.prefix, key); + self.client.remove_object(&full_key); + Ok(()) + } + + fn contains_dir(&self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + Ok(self.client.contains_prefix(&full_key)) + } + + fn contains_key(&self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + Ok(self.client.contains_key(&full_key)) + } + + fn is_upload_in_progress(&self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + Ok(self.client.is_upload_in_progress(&full_key)) + } + + fn get_object_storage_class(&self, key: &str) -> Result, Box> { + let full_key = format!("{}{}", self.prefix, key); + self.client + .get_object_storage_class(&full_key) + .map_err(|e| Box::new(e) as Box) + } + + fn restore_object(&mut self, key: &str, _expedited: bool) -> Result<(), Box> { + let full_key = format!("{}{}", self.prefix, key); + self.client + .restore_object(&full_key) + .map_err(|e| Box::new(e) as Box) + } + + fn is_object_restored(&mut self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + self.client + .is_object_restored(&full_key) + .map_err(|e| Box::new(e) as Box) + } + } +} + +#[cfg(feature = "s3_tests")] +pub mod s3_session { + use super::*; + + use std::future::Future; + + use aws_sdk_s3::config::Region; + use aws_sdk_s3::operation::head_object::HeadObjectError; + use aws_sdk_s3::primitives::ByteStream; + use aws_sdk_s3::types::{ChecksumAlgorithm, GlacierJobParameters, RestoreRequest, Tier}; + use aws_sdk_s3::Client; + use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch}; + use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; + use mountpoint_s3_client::S3CrtClient; + + /// Create a FUSE mount backed by a real S3 client + pub fn new(test_name: &str, test_config: TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) { + let mount_dir = tempfile::tempdir().unwrap(); + + let (bucket, prefix) = get_test_bucket_and_prefix(test_name); + let region = get_test_region(); + + let client_config = S3ClientConfig::default() + .part_size(test_config.part_size) + .endpoint_config(EndpointConfig::new(®ion)); + let client = S3CrtClient::new(client_config).unwrap(); + let runtime = client.event_loop_group(); + let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); + let session = create_fuse_session( + client, + prefetcher, + &bucket, + &prefix, + mount_dir.path(), + test_config.filesystem_config, + ); + let test_client = create_test_client(®ion, &bucket, &prefix); + + (mount_dir, session, test_client) + } + + /// Create a FUSE mount backed by a real S3 client, with caching + pub fn new_with_cache( + cache: Cache, + ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) + where + Cache: DataCache + Send + Sync + 'static, + { + |test_name, test_config| { + let mount_dir = tempfile::tempdir().unwrap(); + + let (bucket, prefix) = get_test_bucket_and_prefix(test_name); + let region = get_test_region(); + + let client_config = S3ClientConfig::default() + .part_size(test_config.part_size) + .endpoint_config(EndpointConfig::new(®ion)); + let client = S3CrtClient::new(client_config).unwrap(); + let runtime = client.event_loop_group(); + let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); + let session = create_fuse_session( + client, + prefetcher, + &bucket, + &prefix, + mount_dir.path(), + test_config.filesystem_config, + ); + let test_client = create_test_client(®ion, &bucket, &prefix); + + (mount_dir, session, test_client) + } + } + + fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox { + let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); + let test_client = SDKTestClient { + prefix: prefix.to_owned(), + bucket: bucket.to_owned(), + sdk_client, + }; + + Box::new(test_client) + } + + async fn get_test_sdk_client(region: &str) -> aws_sdk_s3::Client { + let config = aws_config::from_env() + .region(Region::new(region.to_string())) + .load() + .await; + aws_sdk_s3::Client::new(&config) + } + + fn tokio_block_on(future: F) -> F::Output { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + runtime.block_on(future) + } + + struct SDKTestClient { + prefix: String, + bucket: String, + sdk_client: Client, + } + + impl TestClient for SDKTestClient { + fn put_object_params( + &mut self, + key: &str, + value: &[u8], + params: PutObjectParams, + ) -> Result<(), Box> { + let full_key = format!("{}{}", self.prefix, key); + let mut request = self + .sdk_client + .put_object() + .bucket(&self.bucket) + .key(full_key) + .body(ByteStream::from(value.to_vec())); + if let Some(storage_class) = params.storage_class { + request = request.set_storage_class(Some(storage_class.as_str().into())); + } + if params.trailing_checksums { + request = request.set_checksum_algorithm(Some(ChecksumAlgorithm::Crc32C)); + } + tokio_block_on(request.send()) + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) + } + + fn remove_object(&mut self, key: &str) -> Result<(), Box> { + let full_key = format!("{}{}", self.prefix, key); + tokio_block_on( + self.sdk_client + .delete_object() + .bucket(&self.bucket) + .key(full_key) + .send(), + ) + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) + } + + fn contains_dir(&self, key: &str) -> Result> { + let full_key_suffixed = format!("{}{}/", self.prefix, key); + tokio_block_on( + self.sdk_client + .list_objects_v2() + .bucket(&self.bucket) + .delimiter('/') + .prefix(full_key_suffixed) + .send(), + ) + .map(|output| { + let len = output.contents().map(|c| c.len()).unwrap_or_default() + + output.common_prefixes().map(|c| c.len()).unwrap_or_default(); + len > 0 + }) + .map_err(|e| Box::new(e) as Box) + } + + fn contains_key(&self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + let result = tokio_block_on(self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send()); + match result { + Ok(_) => Ok(true), + Err(e) => match e.into_service_error() { + HeadObjectError::NotFound(_) => Ok(false), + err => Err(Box::new(err) as Box), + }, + } + } + + fn is_upload_in_progress(&self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + tokio_block_on( + self.sdk_client + .list_multipart_uploads() + .bucket(&self.bucket) + .prefix(full_key) + .send(), + ) + .map(|output| output.uploads().map_or(0, |u| u.len()) > 0) + .map_err(|e| Box::new(e) as Box) + } + + fn get_object_storage_class(&self, key: &str) -> Result, Box> { + let full_key = format!("{}{}", self.prefix, key); + tokio_block_on( + self.sdk_client + .get_object_attributes() + .bucket(&self.bucket) + .key(full_key) + .object_attributes(aws_sdk_s3::types::ObjectAttributes::StorageClass) + .send(), + ) + .map(|output| output.storage_class().map(|s| s.as_str().to_string())) + .map_err(|e| Box::new(e) as Box) + } + + // Schudule restoration of an object, do not wait until completion. Expidited restoration completes within 1-5 min for GLACIER and is not available for DEEP_ARCHIVE. + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/restoring-objects-retrieval-options.html?icmpid=docs_amazons3_console#restoring-objects-upgrade-tier + fn restore_object(&mut self, key: &str, expedited: bool) -> Result<(), Box> { + let full_key = format!("{}{}", self.prefix, key); + let tier = if expedited { Tier::Expedited } else { Tier::Bulk }; + tokio_block_on( + self.sdk_client + .restore_object() + .bucket(&self.bucket) + .key(full_key) + .set_restore_request(Some( + RestoreRequest::builder() + .set_days(Some(1)) + .set_glacier_job_parameters(Some( + GlacierJobParameters::builder().set_tier(Some(tier)).build(), + )) + .build(), + )) + .send(), + ) + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) + } + + fn is_object_restored(&mut self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + tokio_block_on(self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send()) + .map(|output| output.restore().unwrap().contains("ongoing-request=\"false\"")) + .map_err(|e| Box::new(e) as Box) + } + } +} + +/// Take a `read_dir` iterator and return the entry names +pub fn read_dir_to_entry_names(read_dir_iter: ReadDir) -> Vec { + read_dir_iter + .map(|entry| { + let entry = entry.expect("no io err during readdir"); + let entry_path = entry.path(); + let name = entry_path + .file_name() + .and_then(OsStr::to_str) + .expect("path should end with valid unicode file or dir name"); + name.to_owned() + }) + .collect::>() +} + +pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) { + let bucket = std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests"); + + // Generate a random nonce to make sure this prefix is truly unique + let nonce = OsRng.next_u64(); + + // Prefix always has a trailing "/" to keep meaning in sync with the S3 API. + let prefix = std::env::var("S3_BUCKET_TEST_PREFIX").unwrap_or(String::from("mountpoint-test/")); + assert!(prefix.ends_with('/'), "S3_BUCKET_TEST_PREFIX should end in '/'"); + + let prefix = format!("{prefix}{test_name}/{nonce}/"); + + (bucket, prefix) +} + +pub fn get_test_bucket_forbidden() -> String { + std::env::var("S3_FORBIDDEN_BUCKET_NAME").expect("Set S3_FORBIDDEN_BUCKET_NAME to run integration tests") +} + +pub fn get_test_region() -> String { + std::env::var("S3_REGION").expect("Set S3_REGION to run integration tests") +} + +pub fn get_subsession_iam_role() -> String { + std::env::var("S3_SUBSESSION_IAM_ROLE").expect("Set S3_SUBSESSION_IAM_ROLE to run integration tests") +} + +pub fn create_objects(bucket: &str, prefix: &str, region: &str, key: &str, value: &[u8]) { + let config = tokio_block_on(aws_config::from_env().region(Region::new(region.to_string())).load()); + let sdk_client = aws_sdk_s3::Client::new(&config); + let full_key = format!("{prefix}{key}"); + tokio_block_on(async move { + sdk_client + .put_object() + .bucket(bucket) + .key(full_key) + .body(ByteStream::from(value.to_vec())) + .send() + .await + .unwrap() + }); +} + +pub fn tokio_block_on(future: F) -> F::Output { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + runtime.block_on(future) +} diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index a33158c1e..8ae345fc0 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -1,3 +1,8 @@ +#![allow(unused)] + +#[cfg(feature = "fuse_tests")] +pub mod fuse; + use fuser::{FileAttr, FileType}; use futures::executor::ThreadPool; use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno}; diff --git a/mountpoint-s3/tests/direct_io.rs b/mountpoint-s3/tests/direct_io.rs new file mode 100644 index 000000000..faee95f2c --- /dev/null +++ b/mountpoint-s3/tests/direct_io.rs @@ -0,0 +1,104 @@ +//! O_DIRECT tests are grouped in a separate integration test module so they can be +//! run serially. This is because O_DIRECT has undefined behavior if run concurrently +//! with a fork, and FUSE tests involve a fork to spawn fusermount. + +#![cfg(target_os = "linux")] +#![cfg(feature = "s3_tests")] + +mod common; + +use std::fs::File; +use std::os::unix::fs::{FileExt, OpenOptionsExt}; +use std::{fs::OpenOptions, time::Duration}; + +use crate::common::fuse::{self, TestClientBox, TestSessionConfig}; +use fuser::BackgroundSession; +use mountpoint_s3::data_cache::InMemoryDataCache; +use mountpoint_s3::fs::{CacheConfig, S3FilesystemConfig}; +use serial_test::serial; +use tempfile::TempDir; +use test_case::test_case; + +fn cache_and_direct_io_test(creator_fn: F, prefix: &str) +where + F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), +{ + const OBJECT_SIZE: usize = 8; + + let test_session_conf = TestSessionConfig { + filesystem_config: S3FilesystemConfig { + cache_config: CacheConfig { + serve_lookup_from_cache: true, + dir_ttl: Duration::from_secs(600), + file_ttl: Duration::from_secs(600), + }, + ..Default::default() + }, + ..Default::default() + }; + let (mount_point, _session, mut test_client) = creator_fn(prefix, test_session_conf); + + let file_name = "file.bin"; + + // Create the first version of the file + let old_contents = vec![0x0fu8; OBJECT_SIZE]; + test_client.put_object(file_name, &old_contents).unwrap(); + + // Open and read fully the file before updating it remotely + let old_file = File::open(mount_point.path().join(file_name)).unwrap(); + let mut buf = vec![0u8; OBJECT_SIZE]; + old_file.read_exact_at(&mut buf, 0).unwrap(); + assert_eq!(buf, &old_contents[..buf.len()]); + + let new_contents = vec![0xffu8; OBJECT_SIZE]; + test_client.put_object(file_name, &new_contents).unwrap(); + + // Open the file again, which should be reading from cache + for _ in 0..2 { + let new_file = File::open(mount_point.path().join(file_name)).unwrap(); + new_file + .read_exact_at(&mut buf, 0) + .expect("should be OK as result is cached"); + assert_eq!( + buf, + &old_contents[..buf.len()], + "bytes read should be old object from cache" + ); + } + + // Open the file w/ O_DIRECT, which should see the new file on S3 despite the old file being cached + let mut buf = vec![0u8; OBJECT_SIZE]; + let new_file = OpenOptions::new() + .read(true) + .custom_flags(libc::O_DIRECT) + .open(mount_point.path().join(file_name)) + .unwrap(); + new_file + .read_exact_at(&mut buf, 0) + .expect("should be able to read file content from S3"); + assert_eq!( + buf, + &new_contents[..buf.len()], + "bytes read should be new bytes from S3 client" + ); +} + +#[test_case(""; "no prefix")] +#[test_case("cache_and_direct_io_test_mock"; "prefix")] +#[serial] +fn cache_and_direct_io_test_mock(prefix: &str) { + cache_and_direct_io_test( + fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + prefix, + ); +} + +#[cfg(feature = "s3_tests")] +#[test] +#[serial] +fn cache_and_direct_io_test_s3() { + cache_and_direct_io_test( + fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "cache_and_direct_io_test_s3", + ); +} diff --git a/mountpoint-s3/tests/fuse_tests/consistency_test.rs b/mountpoint-s3/tests/fuse_tests/consistency_test.rs index ffc1b08ea..07a500434 100644 --- a/mountpoint-s3/tests/fuse_tests/consistency_test.rs +++ b/mountpoint-s3/tests/fuse_tests/consistency_test.rs @@ -5,10 +5,9 @@ use fuser::BackgroundSession; use tempfile::TempDir; use test_case::test_case; +use crate::common::fuse::{self, TestClientBox, TestSessionConfig}; use mountpoint_s3::data_cache::InMemoryDataCache; -use crate::fuse_tests::{TestClientBox, TestSessionConfig}; - fn page_cache_sharing_test(creator_fn: F, prefix: &str) where F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), @@ -57,14 +56,14 @@ where #[cfg(feature = "s3_tests")] #[test] fn page_cache_sharing_test_s3() { - page_cache_sharing_test(crate::fuse_tests::s3_session::new, "page_cache_sharing_test"); + page_cache_sharing_test(fuse::s3_session::new, "page_cache_sharing_test"); } #[cfg(feature = "s3_tests")] #[test] fn page_cache_sharing_test_s3_with_cache() { page_cache_sharing_test( - crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "page_cache_sharing_test", ); } @@ -72,109 +71,14 @@ fn page_cache_sharing_test_s3_with_cache() { #[test_case(""; "no prefix")] #[test_case("page_cache_sharing_test"; "prefix")] fn page_cache_sharing_test_mock(prefix: &str) { - page_cache_sharing_test(crate::fuse_tests::mock_session::new, prefix); + page_cache_sharing_test(fuse::mock_session::new, prefix); } #[test_case(""; "no prefix")] #[test_case("page_cache_sharing_test"; "prefix")] fn page_cache_sharing_test_mock_with_cache(prefix: &str) { page_cache_sharing_test( - crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), prefix, ); } - -#[cfg(target_os = "linux")] -mod direct_io { - use super::*; - - use std::fs::OpenOptions; - use std::os::unix::fs::OpenOptionsExt; - use std::time::Duration; - - use test_case::test_case; - - use mountpoint_s3::fs::{CacheConfig, S3FilesystemConfig}; - - fn cache_and_direct_io_test(creator_fn: F, prefix: &str) - where - F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), - { - const OBJECT_SIZE: usize = 8; - - let test_session_conf = TestSessionConfig { - filesystem_config: S3FilesystemConfig { - cache_config: CacheConfig { - serve_lookup_from_cache: true, - dir_ttl: Duration::from_secs(600), - file_ttl: Duration::from_secs(600), - }, - ..Default::default() - }, - ..Default::default() - }; - let (mount_point, _session, mut test_client) = creator_fn(prefix, test_session_conf); - - let file_name = "file.bin"; - - // Create the first version of the file - let old_contents = vec![0x0fu8; OBJECT_SIZE]; - test_client.put_object(file_name, &old_contents).unwrap(); - - // Open and read fully the file before updating it remotely - let old_file = File::open(mount_point.path().join(file_name)).unwrap(); - let mut buf = vec![0u8; OBJECT_SIZE]; - old_file.read_exact_at(&mut buf, 0).unwrap(); - assert_eq!(buf, &old_contents[..buf.len()]); - - let new_contents = vec![0xffu8; OBJECT_SIZE]; - test_client.put_object(file_name, &new_contents).unwrap(); - - // Open the file again, which should be reading from cache - for _ in 0..2 { - let new_file = File::open(mount_point.path().join(file_name)).unwrap(); - new_file - .read_exact_at(&mut buf, 0) - .expect("should be OK as result is cached"); - assert_eq!( - buf, - &old_contents[..buf.len()], - "bytes read should be old object from cache" - ); - } - - // Open the file w/ O_DIRECT, which should see the new file on S3 despite the old file being cached - let mut buf = [0u8; OBJECT_SIZE]; - let new_file = OpenOptions::new() - .read(true) - .custom_flags(libc::O_DIRECT) - .open(mount_point.path().join(file_name)) - .unwrap(); - new_file - .read_exact_at(&mut buf, 0) - .expect("should be able to read file content from S3"); - assert_eq!( - buf, - &new_contents[..buf.len()], - "bytes read should be new bytes from S3 client" - ); - } - - #[test_case(""; "no prefix")] - #[test_case("cache_and_direct_io_test_mock"; "prefix")] - fn cache_and_direct_io_test_mock(prefix: &str) { - cache_and_direct_io_test( - crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), - prefix, - ); - } - - #[cfg(feature = "s3_tests")] - #[test] - fn cache_and_direct_io_test_s3() { - cache_and_direct_io_test( - crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), - "cache_and_direct_io_test_s3", - ); - } -} diff --git a/mountpoint-s3/tests/fuse_tests/fork_test.rs b/mountpoint-s3/tests/fuse_tests/fork_test.rs index ba19797a7..14e88e5b0 100644 --- a/mountpoint-s3/tests/fuse_tests/fork_test.rs +++ b/mountpoint-s3/tests/fuse_tests/fork_test.rs @@ -9,7 +9,7 @@ use std::process::Stdio; use std::{path::PathBuf, process::Command}; use test_case::test_case; -use crate::fuse_tests::{ +use crate::common::fuse::{ create_objects, get_subsession_iam_role, get_test_bucket_and_prefix, get_test_bucket_forbidden, get_test_region, read_dir_to_entry_names, tokio_block_on, }; diff --git a/mountpoint-s3/tests/fuse_tests/lookup_test.rs b/mountpoint-s3/tests/fuse_tests/lookup_test.rs index 30b0e21e7..bcdab723d 100644 --- a/mountpoint-s3/tests/fuse_tests/lookup_test.rs +++ b/mountpoint-s3/tests/fuse_tests/lookup_test.rs @@ -10,7 +10,7 @@ use mountpoint_s3::{fs::CacheConfig, S3FilesystemConfig}; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; /// See [mountpoint_s3::inode::tests::test_lookup_directory_overlap]. fn lookup_directory_overlap_test(creator_fn: F, prefix: &str, subdir: &str) @@ -40,7 +40,7 @@ where #[test_case(""; "no subdirectory")] #[test_case("subdir/"; "with subdirectory")] fn lookup_directory_overlap_test_s3(subdir: &str) { - lookup_directory_overlap_test(crate::fuse_tests::s3_session::new, "lookup_dirrectory_overlap", subdir); + lookup_directory_overlap_test(fuse::s3_session::new, "lookup_dirrectory_overlap", subdir); } #[test_case("", ""; "no prefix no subdirectory")] @@ -48,7 +48,7 @@ fn lookup_directory_overlap_test_s3(subdir: &str) { #[test_case("", "subdir/"; "no prefix subdirectory")] #[test_case("lookup_dirrectory_overlap", "subdir/"; "prefix subdirectory")] fn lookup_directory_overlap_test_mock(prefix: &str, subdir: &str) { - lookup_directory_overlap_test(crate::fuse_tests::mock_session::new, prefix, subdir); + lookup_directory_overlap_test(fuse::mock_session::new, prefix, subdir); } fn lookup_weird_characters_test(creator_fn: F, prefix: &str) @@ -97,12 +97,12 @@ where #[cfg(feature = "s3_tests")] #[test] fn lookup_directory_weird_characters_s3() { - lookup_weird_characters_test(crate::fuse_tests::s3_session::new, "lookup_weird_characters_test"); + lookup_weird_characters_test(fuse::s3_session::new, "lookup_weird_characters_test"); } #[test] fn lookup_directory_weird_characters_mock() { - lookup_weird_characters_test(crate::fuse_tests::mock_session::new, "lookup_weird_characters_test"); + lookup_weird_characters_test(fuse::mock_session::new, "lookup_weird_characters_test"); } fn lookup_previously_shadowed_file_test(creator_fn: F) @@ -143,12 +143,12 @@ where #[cfg(feature = "s3_tests")] #[test] fn lookup_previously_shadowed_file_test_s3() { - lookup_previously_shadowed_file_test(crate::fuse_tests::s3_session::new); + lookup_previously_shadowed_file_test(fuse::s3_session::new); } #[test] fn lookup_previously_shadowed_file_test_mock() { - lookup_previously_shadowed_file_test(crate::fuse_tests::mock_session::new); + lookup_previously_shadowed_file_test(fuse::mock_session::new); } fn lookup_unicode_keys_test(creator_fn: F, prefix: &str) @@ -191,10 +191,10 @@ where #[cfg(feature = "s3_tests")] #[test] fn lookup_unicode_keys_s3() { - lookup_unicode_keys_test(crate::fuse_tests::s3_session::new, "lookup_unicode_keys_test"); + lookup_unicode_keys_test(fuse::s3_session::new, "lookup_unicode_keys_test"); } #[test] fn lookup_unicode_keys_mock() { - lookup_unicode_keys_test(crate::fuse_tests::mock_session::new, "lookup_unicode_keys_test"); + lookup_unicode_keys_test(fuse::mock_session::new, "lookup_unicode_keys_test"); } diff --git a/mountpoint-s3/tests/fuse_tests/mkdir_test.rs b/mountpoint-s3/tests/fuse_tests/mkdir_test.rs index bf2c69b95..2117a3941 100644 --- a/mountpoint-s3/tests/fuse_tests/mkdir_test.rs +++ b/mountpoint-s3/tests/fuse_tests/mkdir_test.rs @@ -4,7 +4,7 @@ use fuser::BackgroundSession; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; fn mkdir_test(creator_fn: F, prefix: &str) where @@ -43,11 +43,11 @@ where #[cfg(feature = "s3_tests")] #[test] fn mkdir_test_s3() { - mkdir_test(crate::fuse_tests::s3_session::new, "mkdir_test"); + mkdir_test(fuse::s3_session::new, "mkdir_test"); } #[test_case(""; "unprefixed")] #[test_case("test_prefix/"; "prefixed")] fn mkdir_test_mock(prefix: &str) { - mkdir_test(crate::fuse_tests::mock_session::new, prefix); + mkdir_test(fuse::mock_session::new, prefix); } diff --git a/mountpoint-s3/tests/fuse_tests/mod.rs b/mountpoint-s3/tests/fuse_tests/mod.rs index 551839fcb..2b24c0d1e 100644 --- a/mountpoint-s3/tests/fuse_tests/mod.rs +++ b/mountpoint-s3/tests/fuse_tests/mod.rs @@ -11,550 +11,3 @@ mod semantics_doc_test; mod setattr_test; mod unlink_test; mod write_test; - -use std::ffi::OsStr; -use std::fs::ReadDir; -use std::path::Path; -use std::sync::Arc; - -use aws_sdk_s3::primitives::ByteStream; -use aws_sdk_sts::config::Region; -use fuser::{BackgroundSession, MountOption, Session}; -use futures::Future; -use mountpoint_s3::data_cache::DataCache; -use mountpoint_s3::fuse::S3FuseFilesystem; -use mountpoint_s3::prefetch::{Prefetch, PrefetcherConfig}; -use mountpoint_s3::prefix::Prefix; -use mountpoint_s3::S3FilesystemConfig; -use mountpoint_s3_client::types::PutObjectParams; -use mountpoint_s3_client::ObjectClient; -use rand::RngCore; -use rand_chacha::rand_core::OsRng; -use tempfile::TempDir; - -pub trait TestClient: Send { - fn put_object(&mut self, key: &str, value: &[u8]) -> Result<(), Box> { - self.put_object_params(key, value, PutObjectParams::default()) - } - - fn put_object_params( - &mut self, - key: &str, - value: &[u8], - params: PutObjectParams, - ) -> Result<(), Box>; - - fn remove_object(&mut self, key: &str) -> Result<(), Box>; - - fn contains_dir(&self, key: &str) -> Result>; - - fn contains_key(&self, key: &str) -> Result>; - - fn is_upload_in_progress(&self, key: &str) -> Result>; - - fn get_object_storage_class(&self, key: &str) -> Result, Box>; - - fn restore_object(&mut self, key: &str, expedited: bool) -> Result<(), Box>; - - fn is_object_restored(&mut self, key: &str) -> Result>; -} - -pub type TestClientBox = Box; - -pub struct TestSessionConfig { - pub part_size: usize, - pub filesystem_config: S3FilesystemConfig, - pub prefetcher_config: PrefetcherConfig, -} - -impl Default for TestSessionConfig { - fn default() -> Self { - Self { - part_size: 8 * 1024 * 1024, - filesystem_config: Default::default(), - prefetcher_config: Default::default(), - } - } -} - -fn create_fuse_session( - client: Client, - prefetcher: Prefetcher, - bucket: &str, - prefix: &str, - mount_dir: &Path, - filesystem_config: S3FilesystemConfig, -) -> BackgroundSession -where - Client: ObjectClient + Send + Sync + 'static, - Prefetcher: Prefetch + Send + Sync + 'static, -{ - let options = vec![ - MountOption::DefaultPermissions, - MountOption::FSName("mountpoint-s3".to_string()), - MountOption::NoAtime, - MountOption::AutoUnmount, - MountOption::AllowOther, - ]; - - let prefix = Prefix::new(prefix).expect("valid prefix"); - let session = Session::new( - S3FuseFilesystem::new(client, prefetcher, bucket, &prefix, filesystem_config), - mount_dir, - &options, - ) - .unwrap(); - - BackgroundSession::new(session).unwrap() -} - -mod mock_session { - use super::*; - - use futures::executor::ThreadPool; - use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch}; - use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockObject}; - - /// Create a FUSE mount backed by a mock object client that does not talk to S3 - pub fn new(test_name: &str, test_config: TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) { - let mount_dir = tempfile::tempdir().unwrap(); - - let bucket = "test_bucket"; - let prefix = if test_name.is_empty() { - test_name.to_string() - } else { - format!("{test_name}/") - }; - - let client_config = MockClientConfig { - bucket: bucket.to_string(), - part_size: test_config.part_size, - }; - let client = Arc::new(MockClient::new(client_config)); - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); - let session = create_fuse_session( - client.clone(), - prefetcher, - bucket, - &prefix, - mount_dir.path(), - test_config.filesystem_config, - ); - let test_client = create_test_client(client, &prefix); - - (mount_dir, session, test_client) - } - - /// Create a FUSE mount backed by a mock object client, with caching, that does not talk to S3 - pub fn new_with_cache( - cache: Cache, - ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) - where - Cache: DataCache + Send + Sync + 'static, - { - |test_name, test_config| { - let mount_dir = tempfile::tempdir().unwrap(); - - let bucket = "test_bucket"; - let prefix = if test_name.is_empty() { - test_name.to_string() - } else { - format!("{test_name}/") - }; - - let client_config = MockClientConfig { - bucket: bucket.to_string(), - part_size: test_config.part_size, - }; - let client = Arc::new(MockClient::new(client_config)); - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); - let session = create_fuse_session( - client.clone(), - prefetcher, - bucket, - &prefix, - mount_dir.path(), - test_config.filesystem_config, - ); - let test_client = create_test_client(client, &prefix); - - (mount_dir, session, test_client) - } - } - - fn create_test_client(client: Arc, prefix: &str) -> TestClientBox { - let test_client = MockTestClient { - prefix: prefix.to_owned(), - client, - }; - - Box::new(test_client) - } - - struct MockTestClient { - prefix: String, - client: Arc, - } - - impl TestClient for MockTestClient { - fn put_object_params( - &mut self, - key: &str, - value: &[u8], - params: PutObjectParams, - ) -> Result<(), Box> { - let full_key = format!("{}{}", self.prefix, key); - let mut mock_object = MockObject::from(value); - mock_object.set_storage_class(params.storage_class); - self.client.add_object(&full_key, mock_object); - Ok(()) - } - - fn remove_object(&mut self, key: &str) -> Result<(), Box> { - let full_key = format!("{}{}", self.prefix, key); - self.client.remove_object(&full_key); - Ok(()) - } - - fn contains_dir(&self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - Ok(self.client.contains_prefix(&full_key)) - } - - fn contains_key(&self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - Ok(self.client.contains_key(&full_key)) - } - - fn is_upload_in_progress(&self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - Ok(self.client.is_upload_in_progress(&full_key)) - } - - fn get_object_storage_class(&self, key: &str) -> Result, Box> { - let full_key = format!("{}{}", self.prefix, key); - self.client - .get_object_storage_class(&full_key) - .map_err(|e| Box::new(e) as Box) - } - - fn restore_object(&mut self, key: &str, _expedited: bool) -> Result<(), Box> { - let full_key = format!("{}{}", self.prefix, key); - self.client - .restore_object(&full_key) - .map_err(|e| Box::new(e) as Box) - } - - fn is_object_restored(&mut self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - self.client - .is_object_restored(&full_key) - .map_err(|e| Box::new(e) as Box) - } - } -} - -#[cfg(feature = "s3_tests")] -mod s3_session { - use super::*; - - use std::future::Future; - - use aws_sdk_s3::config::Region; - use aws_sdk_s3::operation::head_object::HeadObjectError; - use aws_sdk_s3::primitives::ByteStream; - use aws_sdk_s3::types::{ChecksumAlgorithm, GlacierJobParameters, RestoreRequest, Tier}; - use aws_sdk_s3::Client; - use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch}; - use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; - use mountpoint_s3_client::S3CrtClient; - - /// Create a FUSE mount backed by a real S3 client - pub fn new(test_name: &str, test_config: TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) { - let mount_dir = tempfile::tempdir().unwrap(); - - let (bucket, prefix) = get_test_bucket_and_prefix(test_name); - let region = get_test_region(); - - let client_config = S3ClientConfig::default() - .part_size(test_config.part_size) - .endpoint_config(EndpointConfig::new(®ion)); - let client = S3CrtClient::new(client_config).unwrap(); - let runtime = client.event_loop_group(); - let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); - let session = create_fuse_session( - client, - prefetcher, - &bucket, - &prefix, - mount_dir.path(), - test_config.filesystem_config, - ); - let test_client = create_test_client(®ion, &bucket, &prefix); - - (mount_dir, session, test_client) - } - - /// Create a FUSE mount backed by a real S3 client, with caching - pub fn new_with_cache( - cache: Cache, - ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) - where - Cache: DataCache + Send + Sync + 'static, - { - |test_name, test_config| { - let mount_dir = tempfile::tempdir().unwrap(); - - let (bucket, prefix) = get_test_bucket_and_prefix(test_name); - let region = get_test_region(); - - let client_config = S3ClientConfig::default() - .part_size(test_config.part_size) - .endpoint_config(EndpointConfig::new(®ion)); - let client = S3CrtClient::new(client_config).unwrap(); - let runtime = client.event_loop_group(); - let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); - let session = create_fuse_session( - client, - prefetcher, - &bucket, - &prefix, - mount_dir.path(), - test_config.filesystem_config, - ); - let test_client = create_test_client(®ion, &bucket, &prefix); - - (mount_dir, session, test_client) - } - } - - fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox { - let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); - let test_client = SDKTestClient { - prefix: prefix.to_owned(), - bucket: bucket.to_owned(), - sdk_client, - }; - - Box::new(test_client) - } - - async fn get_test_sdk_client(region: &str) -> aws_sdk_s3::Client { - let config = aws_config::from_env() - .region(Region::new(region.to_string())) - .load() - .await; - aws_sdk_s3::Client::new(&config) - } - - fn tokio_block_on(future: F) -> F::Output { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - .unwrap(); - runtime.block_on(future) - } - - struct SDKTestClient { - prefix: String, - bucket: String, - sdk_client: Client, - } - - impl TestClient for SDKTestClient { - fn put_object_params( - &mut self, - key: &str, - value: &[u8], - params: PutObjectParams, - ) -> Result<(), Box> { - let full_key = format!("{}{}", self.prefix, key); - let mut request = self - .sdk_client - .put_object() - .bucket(&self.bucket) - .key(full_key) - .body(ByteStream::from(value.to_vec())); - if let Some(storage_class) = params.storage_class { - request = request.set_storage_class(Some(storage_class.as_str().into())); - } - if params.trailing_checksums { - request = request.set_checksum_algorithm(Some(ChecksumAlgorithm::Crc32C)); - } - tokio_block_on(request.send()) - .map(|_| ()) - .map_err(|e| Box::new(e) as Box) - } - - fn remove_object(&mut self, key: &str) -> Result<(), Box> { - let full_key = format!("{}{}", self.prefix, key); - tokio_block_on( - self.sdk_client - .delete_object() - .bucket(&self.bucket) - .key(full_key) - .send(), - ) - .map(|_| ()) - .map_err(|e| Box::new(e) as Box) - } - - fn contains_dir(&self, key: &str) -> Result> { - let full_key_suffixed = format!("{}{}/", self.prefix, key); - tokio_block_on( - self.sdk_client - .list_objects_v2() - .bucket(&self.bucket) - .delimiter('/') - .prefix(full_key_suffixed) - .send(), - ) - .map(|output| { - let len = output.contents().map(|c| c.len()).unwrap_or_default() - + output.common_prefixes().map(|c| c.len()).unwrap_or_default(); - len > 0 - }) - .map_err(|e| Box::new(e) as Box) - } - - fn contains_key(&self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - let result = tokio_block_on(self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send()); - match result { - Ok(_) => Ok(true), - Err(e) => match e.into_service_error() { - HeadObjectError::NotFound(_) => Ok(false), - err => Err(Box::new(err) as Box), - }, - } - } - - fn is_upload_in_progress(&self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - tokio_block_on( - self.sdk_client - .list_multipart_uploads() - .bucket(&self.bucket) - .prefix(full_key) - .send(), - ) - .map(|output| output.uploads().map_or(0, |u| u.len()) > 0) - .map_err(|e| Box::new(e) as Box) - } - - fn get_object_storage_class(&self, key: &str) -> Result, Box> { - let full_key = format!("{}{}", self.prefix, key); - tokio_block_on( - self.sdk_client - .get_object_attributes() - .bucket(&self.bucket) - .key(full_key) - .object_attributes(aws_sdk_s3::types::ObjectAttributes::StorageClass) - .send(), - ) - .map(|output| output.storage_class().map(|s| s.as_str().to_string())) - .map_err(|e| Box::new(e) as Box) - } - - // Schudule restoration of an object, do not wait until completion. Expidited restoration completes within 1-5 min for GLACIER and is not available for DEEP_ARCHIVE. - // https://docs.aws.amazon.com/AmazonS3/latest/userguide/restoring-objects-retrieval-options.html?icmpid=docs_amazons3_console#restoring-objects-upgrade-tier - fn restore_object(&mut self, key: &str, expedited: bool) -> Result<(), Box> { - let full_key = format!("{}{}", self.prefix, key); - let tier = if expedited { Tier::Expedited } else { Tier::Bulk }; - tokio_block_on( - self.sdk_client - .restore_object() - .bucket(&self.bucket) - .key(full_key) - .set_restore_request(Some( - RestoreRequest::builder() - .set_days(Some(1)) - .set_glacier_job_parameters(Some( - GlacierJobParameters::builder().set_tier(Some(tier)).build(), - )) - .build(), - )) - .send(), - ) - .map(|_| ()) - .map_err(|e| Box::new(e) as Box) - } - - fn is_object_restored(&mut self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - tokio_block_on(self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send()) - .map(|output| output.restore().unwrap().contains("ongoing-request=\"false\"")) - .map_err(|e| Box::new(e) as Box) - } - } -} - -/// Take a `read_dir` iterator and return the entry names -pub fn read_dir_to_entry_names(read_dir_iter: ReadDir) -> Vec { - read_dir_iter - .map(|entry| { - let entry = entry.expect("no io err during readdir"); - let entry_path = entry.path(); - let name = entry_path - .file_name() - .and_then(OsStr::to_str) - .expect("path should end with valid unicode file or dir name"); - name.to_owned() - }) - .collect::>() -} - -pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) { - let bucket = std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests"); - - // Generate a random nonce to make sure this prefix is truly unique - let nonce = OsRng.next_u64(); - - // Prefix always has a trailing "/" to keep meaning in sync with the S3 API. - let prefix = std::env::var("S3_BUCKET_TEST_PREFIX").unwrap_or(String::from("mountpoint-test/")); - assert!(prefix.ends_with('/'), "S3_BUCKET_TEST_PREFIX should end in '/'"); - - let prefix = format!("{prefix}{test_name}/{nonce}/"); - - (bucket, prefix) -} - -pub fn get_test_bucket_forbidden() -> String { - std::env::var("S3_FORBIDDEN_BUCKET_NAME").expect("Set S3_FORBIDDEN_BUCKET_NAME to run integration tests") -} - -pub fn get_test_region() -> String { - std::env::var("S3_REGION").expect("Set S3_REGION to run integration tests") -} - -pub fn get_subsession_iam_role() -> String { - std::env::var("S3_SUBSESSION_IAM_ROLE").expect("Set S3_SUBSESSION_IAM_ROLE to run integration tests") -} - -pub fn create_objects(bucket: &str, prefix: &str, region: &str, key: &str, value: &[u8]) { - let config = tokio_block_on(aws_config::from_env().region(Region::new(region.to_string())).load()); - let sdk_client = aws_sdk_s3::Client::new(&config); - let full_key = format!("{prefix}{key}"); - tokio_block_on(async move { - sdk_client - .put_object() - .bucket(bucket) - .key(full_key) - .body(ByteStream::from(value.to_vec())) - .send() - .await - .unwrap() - }); -} - -pub fn tokio_block_on(future: F) -> F::Output { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - .unwrap(); - runtime.block_on(future) -} diff --git a/mountpoint-s3/tests/fuse_tests/perm_test.rs b/mountpoint-s3/tests/fuse_tests/perm_test.rs index f9df884e5..4cebc2a8d 100644 --- a/mountpoint-s3/tests/fuse_tests/perm_test.rs +++ b/mountpoint-s3/tests/fuse_tests/perm_test.rs @@ -10,7 +10,7 @@ use nix::unistd::{getgid, getuid}; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; fn perm_test(creator_fn: F, uid: Option, gid: Option, dir_mode: Option, file_mode: Option) where @@ -164,7 +164,7 @@ fn assert_perm(m: Metadata, uid: u32, gid: u32, perm: u32) { #[test_case(Some(500), Some(20), None, None; "non default gid and uid")] #[test_case(Some(500), Some(20), Some(0o555), Some(0o444); "non default gid, uid and permissions")] fn permission_config_test_s3(uid: Option, gid: Option, dir_mode: Option, file_mode: Option) { - perm_test(crate::fuse_tests::s3_session::new, uid, gid, dir_mode, file_mode); + perm_test(fuse::s3_session::new, uid, gid, dir_mode, file_mode); } #[test_case(None, None, None, None; "default config")] @@ -173,7 +173,7 @@ fn permission_config_test_s3(uid: Option, gid: Option, dir_mode: Optio #[test_case(Some(500), Some(20), None, None; "non default gid and uid")] #[test_case(Some(500), Some(20), Some(0o555), Some(0o444); "non default gid, uid and permissions")] fn permission_config_test_mock(uid: Option, gid: Option, dir_mode: Option, file_mode: Option) { - perm_test(crate::fuse_tests::mock_session::new, uid, gid, dir_mode, file_mode); + perm_test(fuse::mock_session::new, uid, gid, dir_mode, file_mode); } #[cfg(feature = "s3_tests")] @@ -185,7 +185,7 @@ fn permission_config_test_negative_s3( dir_mode: Option, file_mode: Option, ) { - perm_test_negative(crate::fuse_tests::s3_session::new, uid, gid, dir_mode, file_mode); + perm_test_negative(fuse::s3_session::new, uid, gid, dir_mode, file_mode); } #[test_case(None, None, Some(0o000), Some(0o000); "no permissions")] @@ -196,5 +196,5 @@ fn permission_config_test_negative_mock( dir_mode: Option, file_mode: Option, ) { - perm_test_negative(crate::fuse_tests::mock_session::new, uid, gid, dir_mode, file_mode); + perm_test_negative(fuse::mock_session::new, uid, gid, dir_mode, file_mode); } diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index 25f994301..c7ece49af 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -6,7 +6,7 @@ use std::io::Read; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, TestClientBox, TestSessionConfig}; fn read_test(creator_fn: F, object_size: usize) where @@ -34,7 +34,7 @@ where #[test_case(1; "single-byte file")] #[test_case(1024*1024; "1MiB file")] fn read_test_s3(object_size: usize) { - read_test(crate::fuse_tests::s3_session::new, object_size); + read_test(fuse::s3_session::new, object_size); } #[cfg(feature = "s3_tests")] @@ -43,7 +43,7 @@ fn read_test_s3(object_size: usize) { #[test_case(1024*1024; "1MiB file")] fn read_test_s3_with_cache(object_size: usize) { read_test( - crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), object_size, ); } @@ -52,7 +52,7 @@ fn read_test_s3_with_cache(object_size: usize) { #[test_case(1; "single-byte file")] #[test_case(1024*1024; "1MiB file")] fn read_test_mock(object_size: usize) { - read_test(crate::fuse_tests::mock_session::new, object_size); + read_test(fuse::mock_session::new, object_size); } #[test_case(0; "empty file")] @@ -60,7 +60,7 @@ fn read_test_mock(object_size: usize) { #[test_case(1024*1024; "1MiB file")] fn read_test_mock_with_cache(object_size: usize) { read_test( - crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), object_size, ); } @@ -151,7 +151,7 @@ where #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { prefetch_test_etag( - crate::fuse_tests::mock_session::new, + fuse::mock_session::new, "prefetch_test_etag_mock", request_size, read_size, @@ -164,7 +164,7 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { prefetch_test_etag( - crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_mock", request_size, read_size, @@ -177,12 +177,7 @@ fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { - prefetch_test_etag( - crate::fuse_tests::s3_session::new, - "prefetch_test_etag_s3", - request_size, - read_size, - ); + prefetch_test_etag(fuse::s3_session::new, "prefetch_test_etag_s3", request_size, read_size); } #[cfg(feature = "s3_tests")] @@ -192,7 +187,7 @@ fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { prefetch_test_etag( - crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_s3", request_size, read_size, diff --git a/mountpoint-s3/tests/fuse_tests/read_test.rs b/mountpoint-s3/tests/fuse_tests/read_test.rs index e65776a71..8f1c492d9 100644 --- a/mountpoint-s3/tests/fuse_tests/read_test.rs +++ b/mountpoint-s3/tests/fuse_tests/read_test.rs @@ -12,7 +12,7 @@ use rand_chacha::ChaChaRng; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; fn basic_read_test(creator_fn: F, prefix: &str) where @@ -64,14 +64,14 @@ where #[cfg(feature = "s3_tests")] #[test] fn basic_read_test_s3() { - basic_read_test(crate::fuse_tests::s3_session::new, "basic_read_test"); + basic_read_test(fuse::s3_session::new, "basic_read_test"); } #[cfg(feature = "s3_tests")] #[test] fn basic_read_test_s3_with_cache() { basic_read_test( - crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "basic_read_test", ); } @@ -79,14 +79,14 @@ fn basic_read_test_s3_with_cache() { #[test_case("")] #[test_case("basic_read_test")] fn basic_read_test_mock(prefix: &str) { - basic_read_test(crate::fuse_tests::mock_session::new, prefix); + basic_read_test(fuse::mock_session::new, prefix); } #[test_case("")] #[test_case("basic_read_test")] fn basic_read_test_mock_with_cache(prefix: &str) { basic_read_test( - crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), prefix, ); } @@ -160,7 +160,7 @@ where fn read_flexible_retrieval_test_s3() { const FILES: &[&str] = &["STANDARD", "GLACIER_IR", "GLACIER", "DEEP_ARCHIVE"]; read_flexible_retrieval_test( - crate::fuse_tests::s3_session::new, + fuse::s3_session::new, "read_flexible_retrieval_test", FILES, RestorationOptions::None, @@ -171,12 +171,7 @@ fn read_flexible_retrieval_test_s3() { #[test_case("read_flexible_retrieval_test"; "prefix")] fn read_flexible_retrieval_test_mock(prefix: &str) { const FILES: &[&str] = &["STANDARD", "GLACIER_IR", "GLACIER", "DEEP_ARCHIVE"]; - read_flexible_retrieval_test( - crate::fuse_tests::mock_session::new, - prefix, - FILES, - RestorationOptions::None, - ); + read_flexible_retrieval_test(fuse::mock_session::new, prefix, FILES, RestorationOptions::None); } #[test_case(""; "no prefix")] @@ -184,7 +179,7 @@ fn read_flexible_retrieval_test_mock(prefix: &str) { fn read_flexible_retrieval_restored_test_mock(prefix: &str) { const FILES: &[&str] = &["GLACIER", "DEEP_ARCHIVE"]; read_flexible_retrieval_test( - crate::fuse_tests::mock_session::new, + fuse::mock_session::new, prefix, FILES, RestorationOptions::RestoreAndWait, @@ -199,7 +194,7 @@ fn read_flexible_retrieval_restored_test_mock(prefix: &str) { fn read_flexible_retrieval_restored_test_s3() { const RESTORED_FILES: &[&str] = &["GLACIER"]; read_flexible_retrieval_test( - crate::fuse_tests::s3_session::new, + fuse::s3_session::new, "read_flexible_retrieval_restored_test_s3", RESTORED_FILES, RestorationOptions::RestoreAndWait, @@ -211,7 +206,7 @@ fn read_flexible_retrieval_restored_test_s3() { fn read_flexible_retrieval_restoring_test_s3() { const RESTORING_FILES: &[&str] = &["GLACIER", "DEEP_ARCHIVE"]; read_flexible_retrieval_test( - crate::fuse_tests::s3_session::new, + fuse::s3_session::new, "read_flexible_retrieval_restoring_test_s3", RESTORING_FILES, RestorationOptions::RestoreInProgress, diff --git a/mountpoint-s3/tests/fuse_tests/readdir_test.rs b/mountpoint-s3/tests/fuse_tests/readdir_test.rs index 6e1da018e..04d7eba7e 100644 --- a/mountpoint-s3/tests/fuse_tests/readdir_test.rs +++ b/mountpoint-s3/tests/fuse_tests/readdir_test.rs @@ -1,4 +1,4 @@ -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; use fuser::BackgroundSession; use mountpoint_s3::S3FilesystemConfig; use rand::distributions::{Alphanumeric, DistString}; @@ -137,14 +137,14 @@ where #[test] fn readdir_s3() { let rng_seed = rand::thread_rng().gen(); - readdir(crate::fuse_tests::s3_session::new, "", rng_seed); + readdir(fuse::s3_session::new, "", rng_seed); } #[cfg(feature = "s3_tests")] #[test] fn readdir_while_writing_s3() { let rng_seed = rand::thread_rng().gen(); - readdir_while_writing(crate::fuse_tests::s3_session::new, "", rng_seed); + readdir_while_writing(fuse::s3_session::new, "", rng_seed); } #[test] @@ -152,7 +152,7 @@ fn readdir_mock() { let iteration = 10; for _ in 0..iteration { let rng_seed = rand::thread_rng().gen(); - readdir(crate::fuse_tests::mock_session::new, "", rng_seed); + readdir(fuse::mock_session::new, "", rng_seed); } } @@ -161,6 +161,6 @@ fn readdir_while_writing_mock() { let iteration = 10; for _ in 0..iteration { let rng_seed = rand::thread_rng().gen(); - readdir_while_writing(crate::fuse_tests::mock_session::new, "", rng_seed); + readdir_while_writing(fuse::mock_session::new, "", rng_seed); } } diff --git a/mountpoint-s3/tests/fuse_tests/rmdir_test.rs b/mountpoint-s3/tests/fuse_tests/rmdir_test.rs index 04d4c9afb..c4e3a9ca8 100644 --- a/mountpoint-s3/tests/fuse_tests/rmdir_test.rs +++ b/mountpoint-s3/tests/fuse_tests/rmdir_test.rs @@ -1,12 +1,10 @@ -use crate::fuse_tests::{read_dir_to_entry_names, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; use fuser::BackgroundSession; use std::fs::{self, DirBuilder, File}; use std::io::Write; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::TestClientBox; - fn rmdir_local_dir_test(creator_fn: F, prefix: &str) where F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), @@ -71,14 +69,14 @@ where #[test_case(""; "no prefix")] #[test_case("rmdir_test"; "prefix")] fn rmdir_local_dir_test_mock(prefix: &str) { - rmdir_local_dir_test(crate::fuse_tests::mock_session::new, prefix); + rmdir_local_dir_test(fuse::mock_session::new, prefix); } #[cfg(feature = "s3_tests")] #[test_case(""; "no prefix")] #[test_case("rmdir_test"; "prefix")] fn rmdir_local_dir_test_s3(prefix: &str) { - rmdir_local_dir_test(crate::fuse_tests::s3_session::new, prefix); + rmdir_local_dir_test(fuse::s3_session::new, prefix); } fn rmdir_remote_dir_test(creator_fn: F, prefix: &str) @@ -132,14 +130,14 @@ where #[test_case(""; "no prefix")] #[test_case("rmdir_test"; "prefix")] fn rmdir_remote_dir_test_mock(prefix: &str) { - rmdir_remote_dir_test(crate::fuse_tests::mock_session::new, prefix); + rmdir_remote_dir_test(fuse::mock_session::new, prefix); } #[cfg(feature = "s3_tests")] #[test_case(""; "no prefix")] #[test_case("rmdir_test"; "prefix")] fn rmdir_remote_dir_test_s3(prefix: &str) { - rmdir_remote_dir_test(crate::fuse_tests::s3_session::new, prefix); + rmdir_remote_dir_test(fuse::s3_session::new, prefix); } fn create_after_rmdir_test(creator_fn: F, prefix: &str) @@ -176,12 +174,12 @@ where #[test_case(""; "no prefix")] #[test_case("rmdir_test"; "prefix")] fn create_after_rmdir_test_mock(prefix: &str) { - create_after_rmdir_test(crate::fuse_tests::mock_session::new, prefix); + create_after_rmdir_test(fuse::mock_session::new, prefix); } #[cfg(feature = "s3_tests")] #[test_case(""; "no prefix")] #[test_case("rmdir_test"; "prefix")] fn create_after_rmdir_test_s3(prefix: &str) { - create_after_rmdir_test(crate::fuse_tests::s3_session::new, prefix); + create_after_rmdir_test(fuse::s3_session::new, prefix); } diff --git a/mountpoint-s3/tests/fuse_tests/semantics_doc_test.rs b/mountpoint-s3/tests/fuse_tests/semantics_doc_test.rs index 3cc49add1..e10e31022 100644 --- a/mountpoint-s3/tests/fuse_tests/semantics_doc_test.rs +++ b/mountpoint-s3/tests/fuse_tests/semantics_doc_test.rs @@ -4,7 +4,7 @@ use fuser::BackgroundSession; use tempfile::TempDir; use walkdir::WalkDir; -use crate::fuse_tests::{TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, TestClientBox, TestSessionConfig}; /// Recursively list the contents of a directory and return the paths of all entries, with the /// initial `path` stripped. If `files` is true, the list contains only files; if false, it contains @@ -72,13 +72,13 @@ where #[test] fn basic_directory_structure_mock() { - basic_directory_structure(crate::fuse_tests::mock_session::new); + basic_directory_structure(fuse::mock_session::new); } #[cfg(feature = "s3_tests")] #[test] fn basic_directory_structure_s3() { - basic_directory_structure(crate::fuse_tests::s3_session::new); + basic_directory_structure(fuse::s3_session::new); } /// Object keys that end in the path delimiter (`/`) will not be accessible. Instead, a directory of @@ -111,13 +111,13 @@ where #[test] fn keys_ending_in_delimiter_mock() { - keys_ending_in_delimiter(crate::fuse_tests::mock_session::new); + keys_ending_in_delimiter(fuse::mock_session::new); } #[cfg(feature = "s3_tests")] #[test] fn keys_ending_in_delimiter_s3() { - keys_ending_in_delimiter(crate::fuse_tests::s3_session::new); + keys_ending_in_delimiter(fuse::s3_session::new); } /// Files will be shadowed by directories with the same name. For example, if your bucket has the @@ -147,11 +147,11 @@ where #[test] fn files_shadowed_by_directories_mock() { - files_shadowed_by_directories(crate::fuse_tests::mock_session::new); + files_shadowed_by_directories(fuse::mock_session::new); } #[cfg(feature = "s3_tests")] #[test] fn files_shadowed_by_directories_s3() { - files_shadowed_by_directories(crate::fuse_tests::s3_session::new); + files_shadowed_by_directories(fuse::s3_session::new); } diff --git a/mountpoint-s3/tests/fuse_tests/setattr_test.rs b/mountpoint-s3/tests/fuse_tests/setattr_test.rs index 7d74c1cd6..089bae69e 100644 --- a/mountpoint-s3/tests/fuse_tests/setattr_test.rs +++ b/mountpoint-s3/tests/fuse_tests/setattr_test.rs @@ -8,7 +8,7 @@ use fuser::BackgroundSession; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, TestClientBox, TestSessionConfig}; fn open_for_write(path: impl AsRef, append: bool) -> std::io::Result { let mut options = File::options(); @@ -68,11 +68,11 @@ where #[test_case(true; "append")] #[test_case(false; "no append")] fn setattr_test_s3(append: bool) { - setattr_test(crate::fuse_tests::s3_session::new, "setattr_test_s3", append); + setattr_test(fuse::s3_session::new, "setattr_test_s3", append); } #[test_case(true; "append")] #[test_case(false; "no append")] fn setattr_test_mock(append: bool) { - setattr_test(crate::fuse_tests::mock_session::new, "setattr_test_mock", append); + setattr_test(fuse::mock_session::new, "setattr_test_mock", append); } diff --git a/mountpoint-s3/tests/fuse_tests/unlink_test.rs b/mountpoint-s3/tests/fuse_tests/unlink_test.rs index 41e305557..7d59cc41f 100644 --- a/mountpoint-s3/tests/fuse_tests/unlink_test.rs +++ b/mountpoint-s3/tests/fuse_tests/unlink_test.rs @@ -6,7 +6,7 @@ use mountpoint_s3::S3FilesystemConfig; use tempfile::TempDir; use test_case::test_case; -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; /// Simple test cases, assuming a file isn't open for reading elsewhere. fn simple_unlink_tests(creator_fn: F, prefix: &str) @@ -52,13 +52,13 @@ where #[cfg(feature = "s3_tests")] #[test] fn simple_unlink_test_s3() { - simple_unlink_tests(crate::fuse_tests::s3_session::new, "simple_unlink_tests"); + simple_unlink_tests(fuse::s3_session::new, "simple_unlink_tests"); } #[test_case(""; "no prefix")] #[test_case("simple_unlink_test"; "prefix")] fn simple_unlink_test_mock(prefix: &str) { - simple_unlink_tests(crate::fuse_tests::mock_session::new, prefix); + simple_unlink_tests(fuse::mock_session::new, prefix); } /// Testing behavior when a file is unlinked in the middle of reading @@ -108,13 +108,13 @@ where #[cfg(feature = "s3_tests")] #[test] fn unlink_readhandle_test_s3() { - unlink_readhandle_test(crate::fuse_tests::s3_session::new, "unlink_readhandle_test"); + unlink_readhandle_test(fuse::s3_session::new, "unlink_readhandle_test"); } #[test_case(""; "no prefix")] #[test_case("unlink_readhandle_test"; "prefix")] fn unlink_readhandle_test_mock(prefix: &str) { - unlink_readhandle_test(crate::fuse_tests::mock_session::new, prefix); + unlink_readhandle_test(fuse::mock_session::new, prefix); } /// Testing behavior when a file is unlinked during and after writing @@ -176,13 +176,13 @@ where #[cfg(feature = "s3_tests")] #[test] fn unlink_writehandle_test_s3() { - unlink_writehandle_test(crate::fuse_tests::s3_session::new, "unlink_writehandle_test"); + unlink_writehandle_test(fuse::s3_session::new, "unlink_writehandle_test"); } #[test_case(""; "no prefix")] #[test_case("unlink_writehandle_test"; "prefix")] fn unlink_writehandle_test_mock(prefix: &str) { - unlink_writehandle_test(crate::fuse_tests::mock_session::new, prefix); + unlink_writehandle_test(fuse::mock_session::new, prefix); } fn unlink_fail_on_delete_not_allowed_test(creator_fn: F) @@ -210,10 +210,10 @@ where #[cfg(feature = "s3_tests")] #[test] fn unlink_fail_on_delete_not_allowed_test_s3() { - unlink_fail_on_delete_not_allowed_test(crate::fuse_tests::s3_session::new); + unlink_fail_on_delete_not_allowed_test(fuse::s3_session::new); } #[test] fn unlink_fail_on_delete_not_allowed_test_mock() { - unlink_fail_on_delete_not_allowed_test(crate::fuse_tests::mock_session::new); + unlink_fail_on_delete_not_allowed_test(fuse::mock_session::new); } diff --git a/mountpoint-s3/tests/fuse_tests/write_test.rs b/mountpoint-s3/tests/fuse_tests/write_test.rs index ae51de305..38ccaefaf 100644 --- a/mountpoint-s3/tests/fuse_tests/write_test.rs +++ b/mountpoint-s3/tests/fuse_tests/write_test.rs @@ -13,7 +13,7 @@ use test_case::test_case; use mountpoint_s3::S3FilesystemConfig; -use crate::fuse_tests::{read_dir_to_entry_names, TestClientBox, TestSessionConfig}; +use crate::common::fuse::{self, read_dir_to_entry_names, TestClientBox, TestSessionConfig}; fn open_for_write(path: impl AsRef, append: bool) -> std::io::Result { let mut options = File::options(); @@ -136,7 +136,7 @@ where #[test_case(true; "append")] #[test_case(false; "no append")] fn sequential_write_test_s3(append: bool) { - sequential_write_test(crate::fuse_tests::s3_session::new, "sequential_write_test", append); + sequential_write_test(fuse::s3_session::new, "sequential_write_test", append); } #[test_case("", true; "no prefix append")] @@ -144,7 +144,7 @@ fn sequential_write_test_s3(append: bool) { #[test_case("sequential_write_test", true; "prefix append")] #[test_case("sequential_write_test", false; "prefix no append")] fn sequential_write_test_mock(prefix: &str, append: bool) { - sequential_write_test(crate::fuse_tests::mock_session::new, prefix, append); + sequential_write_test(fuse::mock_session::new, prefix, append); } fn write_errors_test(creator_fn: F, prefix: &str) @@ -192,13 +192,13 @@ where #[cfg(feature = "s3_tests")] #[test] fn write_errors_test_s3() { - write_errors_test(crate::fuse_tests::s3_session::new, "write_errors_test"); + write_errors_test(fuse::s3_session::new, "write_errors_test"); } #[test_case(""; "no prefix append")] #[test_case("sequential_write_test"; "prefix")] fn write_errors_test_mock(prefix: &str) { - write_errors_test(crate::fuse_tests::mock_session::new, prefix); + write_errors_test(fuse::mock_session::new, prefix); } fn sequential_write_streaming_test(creator_fn: F, object_size: usize, write_chunk_size: usize) @@ -257,7 +257,7 @@ where #[test_case(32, 32)] #[test_case(32 * 1024 * 1024, 1024 * 1024 + 1)] fn sequential_write_streaming_test_s3(object_size: usize, write_chunk_size: usize) { - sequential_write_streaming_test(crate::fuse_tests::s3_session::new, object_size, write_chunk_size); + sequential_write_streaming_test(fuse::s3_session::new, object_size, write_chunk_size); } #[test_case(0, 1)] @@ -265,7 +265,7 @@ fn sequential_write_streaming_test_s3(object_size: usize, write_chunk_size: usiz #[test_case(32, 32)] #[test_case(32 * 1024 * 1024, 1024 * 1024 + 1)] fn sequential_write_streaming_test_mock(object_size: usize, write_chunk_size: usize) { - sequential_write_streaming_test(crate::fuse_tests::mock_session::new, object_size, write_chunk_size); + sequential_write_streaming_test(fuse::mock_session::new, object_size, write_chunk_size); } fn fsync_test(creator_fn: F) @@ -314,12 +314,12 @@ where #[cfg(feature = "s3_tests")] #[test] fn fsync_test_s3() { - fsync_test(crate::fuse_tests::s3_session::new); + fsync_test(fuse::s3_session::new); } #[test] fn fsync_test_mock() { - fsync_test(crate::fuse_tests::mock_session::new); + fsync_test(fuse::mock_session::new); } fn write_too_big_test(creator_fn: F, write_size: usize) @@ -369,7 +369,7 @@ where #[test_case(7000; "not divisible by max size")] #[test_case(640001; "single write too big")] fn write_too_big_test_mock(write_size: usize) { - write_too_big_test(crate::fuse_tests::mock_session::new, write_size); + write_too_big_test(fuse::mock_session::new, write_size); } fn out_of_order_write_test(creator_fn: F, offset: i64) @@ -415,13 +415,13 @@ where #[test_case(-1; "earlier offset")] #[test_case(1; "later offset")] fn out_of_order_write_test_s3(offset: i64) { - out_of_order_write_test(crate::fuse_tests::s3_session::new, offset); + out_of_order_write_test(fuse::s3_session::new, offset); } #[test_case(-1; "earlier offset")] #[test_case(1; "later offset")] fn out_of_order_write_test_mock(offset: i64) { - out_of_order_write_test(crate::fuse_tests::mock_session::new, offset); + out_of_order_write_test(fuse::mock_session::new, offset); } fn write_with_storage_class_test(creator_fn: F, storage_class: Option<&str>) @@ -453,14 +453,14 @@ where #[test_case(Some("INTELLIGENT_TIERING"))] #[test_case(Some("GLACIER"))] fn write_with_storage_class_test_s3(storage_class: Option<&str>) { - write_with_storage_class_test(crate::fuse_tests::s3_session::new, storage_class); - write_with_storage_class_test(crate::fuse_tests::mock_session::new, storage_class); + write_with_storage_class_test(fuse::s3_session::new, storage_class); + write_with_storage_class_test(fuse::mock_session::new, storage_class); } #[test_case(Some("INTELLIGENT_TIERING"))] #[test_case(Some("GLACIER"))] fn write_with_storage_class_test_s3_mock(storage_class: Option<&str>) { - write_with_storage_class_test(crate::fuse_tests::mock_session::new, storage_class); + write_with_storage_class_test(fuse::mock_session::new, storage_class); } #[cfg_attr(not(feature = "s3_tests"), allow(unused))] // Mock client doesn't validate storage classes @@ -494,7 +494,7 @@ fn write_file(path: impl AsRef) -> std::io::Result<()> { #[cfg(feature = "s3_tests")] #[test_case("INVALID_CLASS")] fn write_with_invalid_storage_class_test_s3(storage_class: &str) { - write_with_invalid_storage_class_test(crate::fuse_tests::s3_session::new, storage_class); + write_with_invalid_storage_class_test(fuse::s3_session::new, storage_class); } fn flush_test(creator_fn: F, append: bool) @@ -538,13 +538,13 @@ where #[test_case(true; "append")] #[test_case(false; "no append")] fn flush_test_s3(append: bool) { - flush_test(crate::fuse_tests::s3_session::new, append); + flush_test(fuse::s3_session::new, append); } #[test_case(true; "append")] #[test_case(false; "no append")] fn flush_test_mock(append: bool) { - flush_test(crate::fuse_tests::mock_session::new, append); + flush_test(fuse::mock_session::new, append); } fn touch_test(creator_fn: F) @@ -583,12 +583,12 @@ where #[cfg(feature = "s3_tests")] #[test] fn touch_test_s3() { - touch_test(crate::fuse_tests::s3_session::new); + touch_test(fuse::s3_session::new); } #[test] fn touch_test_mock() { - touch_test(crate::fuse_tests::mock_session::new); + touch_test(fuse::mock_session::new); } fn dd_test(creator_fn: F) @@ -621,18 +621,18 @@ where #[cfg(feature = "s3_tests")] #[test] fn dd_test_s3() { - dd_test(crate::fuse_tests::s3_session::new); + dd_test(fuse::s3_session::new); } #[test] fn dd_test_mock() { - dd_test(crate::fuse_tests::mock_session::new); + dd_test(fuse::mock_session::new); } #[test] fn spawn_test() { const KEY: &str = "new.txt"; - let (mount_point, _session, _test_client) = crate::fuse_tests::mock_session::new("spawn_test", Default::default()); + let (mount_point, _session, _test_client) = fuse::mock_session::new("spawn_test", Default::default()); let path = mount_point.path().join(KEY); let mut f = open_for_write(&path, false).unwrap(); @@ -654,7 +654,7 @@ fn spawn_test() { #[test] fn multi_thread_test() { const KEY: &str = "new.txt"; - let (mount_point, _session, test_client) = crate::fuse_tests::mock_session::new("spawn_test", Default::default()); + let (mount_point, _session, test_client) = fuse::mock_session::new("spawn_test", Default::default()); let path = mount_point.path().join(KEY); let mut f = open_for_write(&path, false).unwrap(); From c62fd82e1e6d2b2e5c2dde7d75dccf8aa6942092 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Tue, 21 Nov 2023 18:09:47 +0000 Subject: [PATCH 3/4] Add Rustdoc for the common test module Signed-off-by: Alessandro Passaro --- mountpoint-s3/tests/common/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 8ae345fc0..9e981d4e0 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -1,3 +1,5 @@ +//! Functions and types shared across integration test modules. +//! Allow for unused code since this is included independently in each module. #![allow(unused)] #[cfg(feature = "fuse_tests")] From 7be9f4ead35851c4632dcebc2efc73e09dcf711d Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Tue, 21 Nov 2023 18:10:58 +0000 Subject: [PATCH 4/4] Configure the direct_io tests with the correct feature ("fuse_tests") Signed-off-by: Alessandro Passaro --- mountpoint-s3/tests/direct_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mountpoint-s3/tests/direct_io.rs b/mountpoint-s3/tests/direct_io.rs index faee95f2c..4586b6a29 100644 --- a/mountpoint-s3/tests/direct_io.rs +++ b/mountpoint-s3/tests/direct_io.rs @@ -3,7 +3,7 @@ //! with a fork, and FUSE tests involve a fork to spawn fusermount. #![cfg(target_os = "linux")] -#![cfg(feature = "s3_tests")] +#![cfg(feature = "fuse_tests")] mod common;