Skip to content

Commit

Permalink
Implement O_DIRECT for open to bypass metadata cache if enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Carl Jones <[email protected]>
  • Loading branch information
dannycjones committed Nov 20, 2023
1 parent 65cb1a7 commit ae0fe6a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
12 changes: 10 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::{Duration, UNIX_EPOCH};
use time::OffsetDateTime;
use tracing::{debug, error, trace};

use fuser::consts::FOPEN_DIRECT_IO;
use fuser::{FileAttr, KernelConfig};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::ETag;
Expand Down Expand Up @@ -562,7 +563,12 @@ where
pub async fn open(&self, ino: InodeNo, flags: i32, pid: u32) -> Result<Opened, Error> {
trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid);

let force_revalidate = !self.config.cache_config.serve_lookup_from_cache;
#[cfg(not(target_os = "linux"))]
let direct_io = false;
#[cfg(target_os = "linux")]
let direct_io = flags & libc::O_DIRECT != 0;

let force_revalidate = !self.config.cache_config.serve_lookup_from_cache || direct_io;
let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?;

match lookup.inode.kind() {
Expand Down Expand Up @@ -596,7 +602,9 @@ where
};
self.file_handles.write().await.insert(fh, Arc::new(handle));

Ok(Opened { fh, flags: 0 })
let reply_flags = if direct_io { FOPEN_DIRECT_IO } else { 0 };

Ok(Opened { fh, flags: reply_flags })
}

#[allow(clippy::too_many_arguments)] // We don't get to choose this interface
Expand Down
95 changes: 95 additions & 0 deletions mountpoint-s3/tests/fuse_tests/consistency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,98 @@ fn page_cache_sharing_test_mock_with_cache(prefix: &str) {
prefix,
);
}

#[cfg(target_os = "linux")]
mod direct_io {
use super::*;

use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use std::time::Duration;

use test_case::test_case;

use mountpoint_s3::fs::{CacheConfig, S3FilesystemConfig};

fn cache_and_direct_io_test<F>(creator_fn: F, prefix: &str)
where
F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox),
{
const OBJECT_SIZE: usize = 8;

let test_session_conf = TestSessionConfig {
filesystem_config: S3FilesystemConfig {
cache_config: CacheConfig {
serve_lookup_from_cache: true,
dir_ttl: Duration::from_secs(600),
file_ttl: Duration::from_secs(600),
},
..Default::default()
},
..Default::default()
};
let (mount_point, _session, mut test_client) = creator_fn(prefix, test_session_conf);

let file_name = "file.bin";

// Create the first version of the file
let old_contents = vec![0x0fu8; OBJECT_SIZE];
test_client.put_object(file_name, &old_contents).unwrap();

// Open and read fully the file before updating it remotely
let old_file = File::open(mount_point.path().join(file_name)).unwrap();
let mut buf = vec![0u8; OBJECT_SIZE];
old_file.read_exact_at(&mut buf, 0).unwrap();
assert_eq!(buf, &old_contents[..buf.len()]);

let new_contents = vec![0xffu8; OBJECT_SIZE];
test_client.put_object(file_name, &new_contents).unwrap();

// Open the file again, which should be reading from cache
for _ in 0..2 {
let new_file = File::open(mount_point.path().join(file_name)).unwrap();
new_file
.read_exact_at(&mut buf, 0)
.expect("should be OK as result is cached");
assert_eq!(
buf,
&old_contents[..buf.len()],
"bytes read should be old object from cache"
);
}

// Open the file w/ O_DIRECT, which should see the new file on S3 despite the old file being cached
let mut buf = [0u8; OBJECT_SIZE];
let new_file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_DIRECT)
.open(mount_point.path().join(file_name))
.unwrap();
new_file
.read_exact_at(&mut buf, 0)
.expect("should be able to read file content from S3");
assert_eq!(
buf,
&new_contents[..buf.len()],
"bytes read should be new bytes from S3 client"
);
}

#[test_case(""; "no prefix")]
#[test_case("cache_and_direct_io_test_mock"; "prefix")]
fn cache_and_direct_io_test_mock(prefix: &str) {
cache_and_direct_io_test(
crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)),
prefix,
);
}

#[cfg(feature = "s3_tests")]
#[test]
fn cache_and_direct_io_test_s3() {
cache_and_direct_io_test(
crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)),
"cache_and_direct_io_test_s3",
);
}
}

0 comments on commit ae0fe6a

Please sign in to comment.