diff --git a/src/core/tests.rs b/src/core/tests.rs index aaa8628bd0..be3d4369b0 100644 --- a/src/core/tests.rs +++ b/src/core/tests.rs @@ -1,12 +1,16 @@ +use std::path::PathBuf; + +use tempfile::TempDir; + use crate::collector::Count; -use crate::directory::{RamDirectory, WatchCallback}; -use crate::indexer::NoMergePolicy; +use crate::directory::{RamDirectory, RoRamDirectory, WatchCallback}; +use crate::indexer::{LogMergePolicy, NoMergePolicy}; +use crate::postings::Postings; use crate::query::TermQuery; use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT}; use crate::tokenizer::TokenizerManager; use crate::{ - Directory, Document, Index, IndexBuilder, IndexReader, IndexSettings, ReloadPolicy, SegmentId, - Term, + Directory, Document, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy, SegmentId, Term }; #[test] @@ -344,3 +348,35 @@ fn test_merging_segment_update_docfreq() { let term_info = inv_index.get_term_info(&term).unwrap().unwrap(); assert_eq!(term_info.doc_freq, 12); } + +#[test] +fn test_read_only_dir() -> crate::Result<()> { + let schema = throw_away_schema(); + let field = schema.get_field("num_likes").unwrap(); + let tempdir = TempDir::new().unwrap(); + let tempdir_path = PathBuf::from(tempdir.path()); + + let mut write_index = Index::create_in_dir(&tempdir_path, schema.clone())?; + + // let mut index = Index::create_from_tempdir(schema)?; + let mut writer: IndexWriter = write_index.writer_for_tests()?; + writer.commit()?; + let dir = RoRamDirectory::new(&tempdir_path).unwrap(); + let mut index = Index::open_or_create(dir, schema.clone())?; + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into()?; + assert_eq!(reader.searcher().num_docs(), 0); + writer.add_document(doc!(field=>1u64))?; + let (sender, receiver) = crossbeam_channel::unbounded(); + let _handle = index.directory_mut().watch(WatchCallback::new(move || { + let _ = sender.send(()); + })); + writer.commit()?; + assert!(receiver.recv().is_ok()); + assert_eq!(reader.searcher().num_docs(), 0); + reader.reload()?; + assert_eq!(reader.searcher().num_docs(), 1); + Ok(()) +} diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 7419cbe49b..9588132a13 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -297,9 +297,9 @@ impl MmapDirectory { /// uses BSD locks (`flock`). The lock is actually released when /// the `File` object is dropped and its associated file descriptor /// is closed. -struct ReleaseLockFile { - _file: File, - path: PathBuf, +pub(crate) struct ReleaseLockFile { + pub(crate) _file: File, + pub(crate) path: PathBuf, } impl Drop for ReleaseLockFile { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 1dda8579e9..6c7e57c6d5 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -10,6 +10,7 @@ mod footer; mod managed_directory; mod ram_directory; mod watch_event_router; +mod roram_directory; /// Errors specific to the directory module. pub mod error; @@ -27,6 +28,7 @@ pub use self::directory::{Directory, DirectoryClone, DirectoryLock}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RamDirectory; pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; +pub use self::roram_directory::RoRamDirectory; /// Outcome of the Garbage collection pub struct GarbageCollectionResult { diff --git a/src/directory/roram_directory.rs b/src/directory/roram_directory.rs new file mode 100644 index 0000000000..2355575465 --- /dev/null +++ b/src/directory/roram_directory.rs @@ -0,0 +1,212 @@ +use std::{ + collections::HashMap, + fs::{File, OpenOptions}, + io::{self, Read, Write}, + path::{Path, PathBuf}, + sync::{Arc, RwLock}, +}; + +use common::file_slice::FileSlice; +use fs4::FileExt; + +use crate::core::META_FILEPATH; + +use super::{ + error::{LockError, OpenReadError, OpenWriteError}, + file_watcher::FileWatcher, + mmap_directory::ReleaseLockFile, + Directory, DirectoryLock, META_LOCK, +}; + +/// RoRamDirectory is a read only directory that stores data in RAM. +/// Note: please make sure the index files exist before creating a RoRamDirectory. +#[derive(Clone)] +pub struct RoRamDirectory { + inner: Arc>, +} + +impl RoRamDirectory { + pub fn new(dir: &Path) -> Result { + Ok(RoRamDirectory { + inner: Arc::new(RwLock::new(RoRamDirectoryInner::new(dir)?)), + }) + } +} + +impl std::fmt::Debug for RoRamDirectory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RoRamDirectory").finish() + } +} + +impl Directory for RoRamDirectory { + fn get_file_handle( + &self, + path: &std::path::Path, + ) -> Result, super::error::OpenReadError> + { + let file_slice = self.open_read(path)?; + Ok(Arc::new(file_slice)) + } + + fn open_read(&self, path: &Path) -> Result { + self.inner.write().unwrap().open_read(path) + } + + fn delete(&self, path: &std::path::Path) -> Result<(), super::error::DeleteError> { + self.inner.write().unwrap().delete(path); + Ok(()) + } + + fn exists(&self, path: &std::path::Path) -> Result { + Ok(self.inner.read().unwrap().exists(path)) + } + + fn open_write( + &self, + path: &std::path::Path, + ) -> Result { + unimplemented!() + } + + fn acquire_lock( + &self, + lock: &super::Lock, + ) -> Result { + let full_path = self.inner.read().unwrap().root_path.join(&lock.filepath); + // We make sure that the file exists. + let file: File = OpenOptions::new() + .write(true) + .create(true) //< if the file does not exist yet, create it. + .truncate(false) + .open(full_path) + .map_err(LockError::wrap_io_error)?; + if lock.is_blocking { + file.lock_exclusive().map_err(LockError::wrap_io_error)?; + } else { + file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? + } + // dropping the file handle will release the lock. + Ok(DirectoryLock::from(Box::new(ReleaseLockFile { + path: lock.filepath.clone(), + _file: file, + }))) + } + + fn atomic_read(&self, path: &std::path::Path) -> Result, super::error::OpenReadError> { + let full_path = self.inner.read().unwrap().root_path.join(path); + let mut buffer = Vec::new(); + match File::open(full_path) { + Ok(mut file) => { + file.read_to_end(&mut buffer).map_err(|io_error| { + OpenReadError::wrap_io_error(io_error, path.to_path_buf()) + })?; + Ok(buffer) + } + Err(io_error) => { + if io_error.kind() == io::ErrorKind::NotFound { + Err(OpenReadError::FileDoesNotExist(path.to_owned())) + } else { + Err(OpenReadError::wrap_io_error(io_error, path.to_path_buf())) + } + } + } + } + + fn atomic_write(&self, _path: &std::path::Path, _data: &[u8]) -> std::io::Result<()> { + unimplemented!("RoRamDirectory is read-only") + } + + fn sync_directory(&self) -> std::io::Result<()> { + Ok(()) + } + + fn watch(&self, watch_callback: super::WatchCallback) -> crate::Result { + self.inner.read().unwrap().watch(watch_callback) + } +} + +struct RoRamDirectoryInner { + root_path: PathBuf, + files: HashMap, + watcher: FileWatcher, +} + +fn open_file(path: &Path) -> Result { + let mut file = File::open(path)?; + let mut data = Vec::new(); + file.read_to_end(&mut data)?; + let file_slice = FileSlice::from(data); + Ok(file_slice) +} + +impl RoRamDirectoryInner { + fn new(dir: &Path) -> Result { + // read all files in the directory + let mut files: HashMap = HashMap::new(); + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if !entry.file_type()?.is_file() { + warn!("Skipping non-file {:?}", path); + continue; + } + let file_slice = open_file(&path)?; + files.insert(entry.file_name().into(), file_slice); + } + Ok(RoRamDirectoryInner { + root_path: dir.to_path_buf(), + files, + watcher: FileWatcher::new(&dir.join(*META_FILEPATH)), + }) + } + + fn open_read(&mut self, path: &Path) -> Result { + let slice = self.files.get(path).cloned(); + match slice { + Some(slice) => Ok(slice), + None => { + let full_path = self.root_path.join(path); + let file_slice = open_file(&full_path) + .map_err(|io_error| OpenReadError::wrap_io_error(io_error, full_path))?; + self.files.insert(path.to_path_buf(), file_slice.clone()); + Ok(file_slice) + } + } + } + + fn exists(&self, path: &Path) -> bool { + self.files.contains_key(path) + } + + fn watch(&self, watch_callback: super::WatchCallback) -> crate::Result { + Ok(self.watcher.watch(watch_callback)) + } + + fn delete(&mut self, path: &Path) { + self.files.remove(path); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + + #[test] + fn test_ro_ram_directory() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path(); + let file_name = Path::new("test.txt"); + let file_path = dir_path.join(file_name); + let mut file = File::create(&file_path).unwrap(); + file.write_all(b"hello world").unwrap(); + + let ram_dir = RoRamDirectory::new(dir_path).unwrap(); + + let file_slice = ram_dir.open_read(&file_name).unwrap(); + assert_eq!(file_slice.read_bytes().unwrap().as_slice(), b"hello world"); + + assert!(ram_dir.exists(&file_name).unwrap()); + } +}