Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a read-only ram directory to load index in memory #6

Open
wants to merge 1 commit into
base: 0.21.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions src/core/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::path::PathBuf;

use tempfile::TempDir;

use crate::collector::Count;
use crate::directory::{RamDirectory, WatchCallback};
use crate::indexer::NoMergePolicy;
use crate::directory::{RamDirectory, RoRamDirectory, WatchCallback};
use crate::indexer::{LogMergePolicy, NoMergePolicy};
use crate::postings::Postings;
use crate::query::TermQuery;
use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT};
use crate::tokenizer::TokenizerManager;
use crate::{
Directory, Document, Index, IndexBuilder, IndexReader, IndexSettings, ReloadPolicy, SegmentId,
Term,
Directory, Document, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy, SegmentId, Term
};

#[test]
Expand Down Expand Up @@ -344,3 +348,35 @@ fn test_merging_segment_update_docfreq() {
let term_info = inv_index.get_term_info(&term).unwrap().unwrap();
assert_eq!(term_info.doc_freq, 12);
}

#[test]
fn test_read_only_dir() -> crate::Result<()> {
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let tempdir = TempDir::new().unwrap();
let tempdir_path = PathBuf::from(tempdir.path());

let mut write_index = Index::create_in_dir(&tempdir_path, schema.clone())?;

// let mut index = Index::create_from_tempdir(schema)?;
let mut writer: IndexWriter = write_index.writer_for_tests()?;
writer.commit()?;
let dir = RoRamDirectory::new(&tempdir_path).unwrap();
let mut index = Index::open_or_create(dir, schema.clone())?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64))?;
let (sender, receiver) = crossbeam_channel::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit()?;
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 0);
reader.reload()?;
assert_eq!(reader.searcher().num_docs(), 1);
Ok(())
}
6 changes: 3 additions & 3 deletions src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ impl MmapDirectory {
/// uses BSD locks (`flock`). The lock is actually released when
/// the `File` object is dropped and its associated file descriptor
/// is closed.
struct ReleaseLockFile {
_file: File,
path: PathBuf,
pub(crate) struct ReleaseLockFile {
pub(crate) _file: File,
pub(crate) path: PathBuf,
}

impl Drop for ReleaseLockFile {
Expand Down
2 changes: 2 additions & 0 deletions src/directory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod footer;
mod managed_directory;
mod ram_directory;
mod watch_event_router;
mod roram_directory;

/// Errors specific to the directory module.
pub mod error;
Expand All @@ -27,6 +28,7 @@ pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RamDirectory;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
pub use self::roram_directory::RoRamDirectory;

/// Outcome of the Garbage collection
pub struct GarbageCollectionResult {
Expand Down
212 changes: 212 additions & 0 deletions src/directory/roram_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::{self, Read, Write},
path::{Path, PathBuf},
sync::{Arc, RwLock},
};

use common::file_slice::FileSlice;
use fs4::FileExt;

use crate::core::META_FILEPATH;

use super::{
error::{LockError, OpenReadError, OpenWriteError},
file_watcher::FileWatcher,
mmap_directory::ReleaseLockFile,
Directory, DirectoryLock, META_LOCK,
};

/// RoRamDirectory is a read only directory that stores data in RAM.
/// Note: please make sure the index files exist before creating a RoRamDirectory.
#[derive(Clone)]
pub struct RoRamDirectory {
inner: Arc<RwLock<RoRamDirectoryInner>>,
}

impl RoRamDirectory {
pub fn new(dir: &Path) -> Result<RoRamDirectory, std::io::Error> {
Ok(RoRamDirectory {
inner: Arc::new(RwLock::new(RoRamDirectoryInner::new(dir)?)),
})
}
}

impl std::fmt::Debug for RoRamDirectory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoRamDirectory").finish()
}
}

impl Directory for RoRamDirectory {
fn get_file_handle(
&self,
path: &std::path::Path,
) -> Result<std::sync::Arc<dyn common::file_slice::FileHandle>, super::error::OpenReadError>
{
let file_slice = self.open_read(path)?;
Ok(Arc::new(file_slice))
}

fn open_read(&self, path: &Path) -> Result<FileSlice, super::error::OpenReadError> {
self.inner.write().unwrap().open_read(path)
}

fn delete(&self, path: &std::path::Path) -> Result<(), super::error::DeleteError> {
self.inner.write().unwrap().delete(path);
Ok(())
}

fn exists(&self, path: &std::path::Path) -> Result<bool, super::error::OpenReadError> {
Ok(self.inner.read().unwrap().exists(path))
}

fn open_write(
&self,
path: &std::path::Path,
) -> Result<super::WritePtr, super::error::OpenWriteError> {
unimplemented!()
}

fn acquire_lock(
&self,
lock: &super::Lock,
) -> Result<super::DirectoryLock, super::error::LockError> {
let full_path = self.inner.read().unwrap().root_path.join(&lock.filepath);
// We make sure that the file exists.
let file: File = OpenOptions::new()
.write(true)
.create(true) //< if the file does not exist yet, create it.
.truncate(false)
.open(full_path)
.map_err(LockError::wrap_io_error)?;
if lock.is_blocking {
file.lock_exclusive().map_err(LockError::wrap_io_error)?;
} else {
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
}
// dropping the file handle will release the lock.
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
path: lock.filepath.clone(),
_file: file,
})))
}

fn atomic_read(&self, path: &std::path::Path) -> Result<Vec<u8>, super::error::OpenReadError> {
let full_path = self.inner.read().unwrap().root_path.join(path);
let mut buffer = Vec::new();
match File::open(full_path) {
Ok(mut file) => {
file.read_to_end(&mut buffer).map_err(|io_error| {
OpenReadError::wrap_io_error(io_error, path.to_path_buf())
})?;
Ok(buffer)
}
Err(io_error) => {
if io_error.kind() == io::ErrorKind::NotFound {
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
} else {
Err(OpenReadError::wrap_io_error(io_error, path.to_path_buf()))
}
}
}
}

fn atomic_write(&self, _path: &std::path::Path, _data: &[u8]) -> std::io::Result<()> {
unimplemented!("RoRamDirectory is read-only")
}

fn sync_directory(&self) -> std::io::Result<()> {
Ok(())
}

fn watch(&self, watch_callback: super::WatchCallback) -> crate::Result<super::WatchHandle> {
self.inner.read().unwrap().watch(watch_callback)
}
}

struct RoRamDirectoryInner {
root_path: PathBuf,
files: HashMap<PathBuf, FileSlice>,
watcher: FileWatcher,
}

fn open_file(path: &Path) -> Result<FileSlice, std::io::Error> {
let mut file = File::open(path)?;
let mut data = Vec::new();
file.read_to_end(&mut data)?;
let file_slice = FileSlice::from(data);
Ok(file_slice)
}

impl RoRamDirectoryInner {
fn new(dir: &Path) -> Result<RoRamDirectoryInner, std::io::Error> {
// read all files in the directory
let mut files: HashMap<PathBuf, FileSlice> = HashMap::new();
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if !entry.file_type()?.is_file() {
warn!("Skipping non-file {:?}", path);
continue;
}
let file_slice = open_file(&path)?;
files.insert(entry.file_name().into(), file_slice);
}
Ok(RoRamDirectoryInner {
root_path: dir.to_path_buf(),
files,
watcher: FileWatcher::new(&dir.join(*META_FILEPATH)),
})
}

fn open_read(&mut self, path: &Path) -> Result<FileSlice, super::error::OpenReadError> {
let slice = self.files.get(path).cloned();
match slice {
Some(slice) => Ok(slice),
None => {
let full_path = self.root_path.join(path);
let file_slice = open_file(&full_path)
.map_err(|io_error| OpenReadError::wrap_io_error(io_error, full_path))?;
self.files.insert(path.to_path_buf(), file_slice.clone());
Ok(file_slice)
}
}
}

fn exists(&self, path: &Path) -> bool {
self.files.contains_key(path)
}

fn watch(&self, watch_callback: super::WatchCallback) -> crate::Result<super::WatchHandle> {
Ok(self.watcher.watch(watch_callback))
}

fn delete(&mut self, path: &Path) {
self.files.remove(path);
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;

#[test]
fn test_ro_ram_directory() {
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
let file_name = Path::new("test.txt");
let file_path = dir_path.join(file_name);
let mut file = File::create(&file_path).unwrap();
file.write_all(b"hello world").unwrap();

let ram_dir = RoRamDirectory::new(dir_path).unwrap();

let file_slice = ram_dir.open_read(&file_name).unwrap();
assert_eq!(file_slice.read_bytes().unwrap().as_slice(), b"hello world");

assert!(ram_dir.exists(&file_name).unwrap());
}
}