Skip to content

Commit

Permalink
File: add option create_only
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Sep 21, 2023
1 parent 44585f5 commit 7f66afc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
9 changes: 9 additions & 0 deletions sea-streamer-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ impl AsyncFile {
Self::new_with(id, file).await
}

/// Always create a new file. If the file already exists, abort.
pub async fn new_w(id: FileId) -> Result<Self, FileErr> {
log::debug!("AsyncFile Create ({})", id.path());
let mut options = OpenOptions::new();
options.write(true).create_new(true);
let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
Self::new_with(id, file).await
}

async fn new_with(id: FileId, file: File) -> Result<Self, FileErr> {
let size = file_size_of(&file).await?;
let pos = 0;
Expand Down
44 changes: 35 additions & 9 deletions sea-streamer-file/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ pub struct FileStreamer {

#[derive(Debug, Clone)]
pub struct FileConnectOptions {
create_if_not_exists: bool,
create_file: CreateFileOption,
end_with_eos: bool,
beacon_interval: u32,
file_size_limit: u64,
prefetch_message: usize,
}

#[derive(Debug, Clone)]
enum CreateFileOption {
/// File must already exists
Never,
CreateIfNotExists,
/// Fail if the file already exists
Always,
}

#[derive(Debug, Clone)]
pub struct FileConsumerOptions {
mode: ConsumerMode,
Expand Down Expand Up @@ -88,11 +97,11 @@ impl StreamerTrait for FileStreamer {
.trim_start_matches("file://")
.trim_end_matches('/');
let file_id = FileId::new(path);
if options.create_if_not_exists {
AsyncFile::new_rw(file_id.clone()).await?;
} else {
AsyncFile::new_r(file_id.clone()).await?;
}
match options.create_file {
CreateFileOption::Never => AsyncFile::new_r(file_id.clone()).await,
CreateFileOption::CreateIfNotExists => AsyncFile::new_rw(file_id.clone()).await,
CreateFileOption::Always => AsyncFile::new_w(file_id.clone()).await,
}?;
Ok(Self { file_id, options })
}

Expand Down Expand Up @@ -187,7 +196,7 @@ impl ConnectOptionsTrait for FileConnectOptions {
impl Default for FileConnectOptions {
fn default() -> Self {
Self {
create_if_not_exists: false,
create_file: CreateFileOption::Never,
end_with_eos: false,
beacon_interval: DEFAULT_BEACON_INTERVAL,
file_size_limit: DEFAULT_FILE_SIZE_LIMIT,
Expand All @@ -198,11 +207,28 @@ impl Default for FileConnectOptions {

impl FileConnectOptions {
pub fn create_if_not_exists(&self) -> bool {
self.create_if_not_exists
matches!(self.create_file, CreateFileOption::CreateIfNotExists)
}
/// Default is `false`.
pub fn set_create_if_not_exists(&mut self, v: bool) -> &mut Self {
self.create_if_not_exists = v;
if v {
self.create_file = CreateFileOption::CreateIfNotExists;
} else {
self.create_file = CreateFileOption::Never;
}
self
}

pub fn create_only(&self) -> bool {
matches!(self.create_file, CreateFileOption::Always)
}
/// Always create the file. Fail if already exists. Default is `false`.
pub fn set_create_only(&mut self, v: bool) -> &mut Self {
if v {
self.create_file = CreateFileOption::Always;
} else {
self.create_file = CreateFileOption::Never;
}
self
}

Expand Down
4 changes: 4 additions & 0 deletions sea-streamer-runtime/src/file/no_rt_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl OpenOptions {
self
}

pub fn create_new(&mut self, _: bool) -> &mut OpenOptions {
self
}

pub async fn open(&self, _: impl AsRef<Path>) -> Result<File, IoError> {
Err(IoError::new(ErrorKind::Other, "Please enable a runtime"))
}
Expand Down

0 comments on commit 7f66afc

Please sign in to comment.