Skip to content

Commit

Permalink
inversion of inversion of control
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jun 17, 2024
1 parent af43ec3 commit b82c80f
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 88 deletions.
7 changes: 0 additions & 7 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ pub trait Reader {
/// no written value handles should be handed out by the index.
/// When `finish` fails, no value handles should be written into the index.
pub trait Writer {
/// Inserts a value directly into the index write batch.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn insert_direct(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()>;

/// Inserts a value handle into the index write batch.
///
/// # Errors
Expand Down
10 changes: 4 additions & 6 deletions src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
id::SegmentId,
key_range::KeyRange,
segment::{gc_stats::GcStats, meta::Metadata, trailer::SegmentFileTrailer},
IndexWriter, Segment, SegmentWriter as MultiWriter,
Segment, SegmentWriter as MultiWriter,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
Expand Down Expand Up @@ -184,8 +184,8 @@ impl SegmentManifest {
})
}

pub fn register<W: IndexWriter>(&self, writer: MultiWriter<W>) -> crate::Result<W> {
let (writers, mut index_writer) = writer.finish()?;
pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
let writers = writer.finish()?;

self.atomic_swap(move |recipe| {
for writer in writers {
Expand Down Expand Up @@ -240,9 +240,7 @@ impl SegmentManifest {
// NOTE: If we crash before before finishing the index write, it's fine
// because all new segments will be unreferenced, and thus can be dropped because stale

index_writer.finish()?;

Ok(index_writer)
Ok(())
}

fn write_to_disk<P: AsRef<Path>>(path: P, segment_ids: &[SegmentId]) -> crate::Result<()> {
Expand Down
4 changes: 0 additions & 4 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ impl IndexReader for MockIndex {
pub struct MockIndexWriter(pub MockIndex);

impl IndexWriter for MockIndexWriter {
fn insert_direct(&mut self, _key: &[u8], _value: &[u8]) -> std::io::Result<()> {
Ok(())
}

fn insert_indirect(
&mut self,
key: &[u8],
Expand Down
66 changes: 18 additions & 48 deletions src/segment/multi_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,26 @@ use super::writer::Writer;
use crate::{
compression::Compressor,
id::{IdGenerator, SegmentId},
IndexWriter, ValueHandle,
ValueHandle,
};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

/// Segment writer, may write multiple segments
pub struct MultiWriter<W: IndexWriter> {
pub struct MultiWriter {
folder: PathBuf,
target_size: u64,

writers: Vec<Writer>,
pub(crate) index_writer: W, // TODO: only need a (mutable) reference??

id_generator: IdGenerator,

compression: Option<Arc<dyn Compressor>>,

blob_separation_size: usize,
}

impl<W: IndexWriter> MultiWriter<W> {
impl MultiWriter {
/// Initializes a new segment writer.
///
/// # Errors
Expand All @@ -35,7 +32,6 @@ impl<W: IndexWriter> MultiWriter<W> {
id_generator: IdGenerator,
target_size: u64,
folder: P,
index_writer: W,
) -> std::io::Result<Self> {
let folder = folder.as_ref();

Expand All @@ -48,20 +44,11 @@ impl<W: IndexWriter> MultiWriter<W> {
target_size,

writers: vec![Writer::new(segment_path, segment_id)?],
index_writer,

compression: None,
blob_separation_size: 2_048,
})
}

/// Sets the separation threshold for blobs
#[must_use]
pub fn blob_separation_size(mut self, bytes: usize) -> Self {
self.blob_separation_size = bytes;
self
}

/// Sets the compression method
#[must_use]
pub fn use_compression(mut self, compressor: Arc<dyn Compressor>) -> Self {
Expand All @@ -70,28 +57,35 @@ impl<W: IndexWriter> MultiWriter<W> {
self
}

fn get_active_writer(&self) -> &Writer {
#[doc(hidden)]
pub fn get_active_writer(&self) -> &Writer {
self.writers.last().expect("should exist")
}

fn get_active_writer_mut(&mut self) -> &mut Writer {
self.writers.last_mut().expect("should exist")
}

/// Returns the current offset in the file.
/// Returns the [`ValueHandle`] for the next written blob.
///
/// This can be used to index an item into an external `Index`.
pub fn get_next_value_handle(&self, key: &[u8]) -> ValueHandle {
ValueHandle {
offset: self.offset(key),
segment_id: self.segment_id(),
}
}

#[must_use]
pub(crate) fn offset(&self, key: &[u8]) -> u64 {
fn offset(&self, key: &[u8]) -> u64 {
self.get_active_writer().offset()
// NOTE: Point to the value record, not the key
// The key is not really needed when dereferencing a value handle
+ std::mem::size_of::<u16>() as u64 + key.len() as u64
}

/// Returns the segment ID
#[must_use]
pub(crate) fn segment_id(&self) -> SegmentId {
fn segment_id(&self) -> SegmentId {
self.get_active_writer().segment_id()
}

Expand Down Expand Up @@ -128,33 +122,9 @@ impl<W: IndexWriter> MultiWriter<W> {

let target_size = self.target_size;

// Give value handle to index writer
let bytes_written = if value.len() >= self.blob_separation_size {
let segment_id = self.segment_id();
let offset = self.offset(key);
let vhandle = ValueHandle { segment_id, offset };

log::trace!(
"inserting indirection: {vhandle:?} => {:?}",
String::from_utf8_lossy(key)
);

self.index_writer
.insert_indirect(key, vhandle, value.len() as u32)?;

// Write actual value into segment
let writer = self.get_active_writer_mut();

writer.write(key, value)?
} else {
log::trace!("GC: inserting direct: {:?}", String::from_utf8_lossy(key));
self.index_writer.insert_direct(key, value)?;

0
};

// Write actual value into segment
let writer = self.get_active_writer_mut();
let bytes_written = writer.write(key, value)?;

// Check for segment size target, maybe rotate to next writer
if writer.offset() >= target_size {
Expand All @@ -165,7 +135,7 @@ impl<W: IndexWriter> MultiWriter<W> {
Ok(bytes_written)
}

pub(crate) fn finish(mut self) -> crate::Result<(Vec<Writer>, W)> {
pub(crate) fn finish(mut self) -> crate::Result<Vec<Writer>> {
let writer = self.get_active_writer_mut();

if writer.item_count > 0 {
Expand All @@ -175,6 +145,6 @@ impl<W: IndexWriter> MultiWriter<W> {
// IMPORTANT: We cannot finish the index writer here
// The writers first need to be registered into the value log

Ok((self.writers, self.index_writer))
Ok(self.writers)
}
}
6 changes: 3 additions & 3 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{

/// Segment writer
pub struct Writer {
pub(crate) path: PathBuf,
pub path: PathBuf,
pub(crate) segment_id: SegmentId,

#[allow(clippy::struct_field_names)]
Expand Down Expand Up @@ -68,13 +68,13 @@ impl Writer {
///
/// This can be used to index an item into an external `Index`.
#[must_use]
pub fn offset(&self) -> u64 {
pub(crate) fn offset(&self) -> u64 {
self.offset
}

/// Returns the segment ID
#[must_use]
pub fn segment_id(&self) -> SegmentId {
pub(crate) fn segment_id(&self) -> SegmentId {
self.segment_id
}

Expand Down
44 changes: 24 additions & 20 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ValueLogInner {
id: u64,

/// Base folder
path: PathBuf,
pub path: PathBuf,

/// Value log configuration
config: Config,
Expand Down Expand Up @@ -173,8 +173,9 @@ impl ValueLog {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn register_writer<W: IndexWriter>(&self, writer: SegmentWriter<W>) -> crate::Result<W> {
self.manifest.register(writer)
pub fn register_writer(&self, writer: SegmentWriter) -> crate::Result<()> {
self.manifest.register(writer)?;
Ok(())
}

/// Returns segment count
Expand Down Expand Up @@ -207,17 +208,7 @@ impl ValueLog {
let mut value = vec![0; val_len as usize];
reader.read_exact(&mut value)?;

/* let value = match segment.meta.compression {
crate::CompressionType::None => value,
#[cfg(feature = "lz4")]
crate::CompressionType::Lz4 => lz4_flex::decompress_size_prepended(&value)
.map_err(|_| crate::Error::Decompress(segment.meta.compression))?,
#[cfg(feature = "miniz")]
crate::CompressionType::Miniz(_) => miniz_oxide::inflate::decompress_to_vec(&value)
.map_err(|_| crate::Error::Decompress(segment.meta.compression))?,
}; */
let value = self.config.compression.decompress(&value)?;

// TODO: handle CRC

Expand All @@ -234,12 +225,11 @@ impl ValueLog {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn get_writer<W: IndexWriter>(&self, index_writer: W) -> crate::Result<SegmentWriter<W>> {
pub fn get_writer(&self) -> crate::Result<SegmentWriter> {
Ok(SegmentWriter::new(
self.id_generator.clone(),
self.config.segment_size_bytes,
self.path.join(SEGMENTS_FOLDER),
index_writer,
)?
.use_compression(self.config.compression.clone()))
}
Expand Down Expand Up @@ -314,6 +304,9 @@ impl ValueLog {
///
/// Will return `Err` if an IO error occurs.
pub fn drop_stale_segments(&self) -> crate::Result<()> {
// IMPORTANT: Only allow 1 rollover or GC at any given time
let _guard = self.rollover_guard.lock().expect("lock is poisoned");

let ids = self
.manifest
.segments
Expand All @@ -324,7 +317,7 @@ impl ValueLog {
.map(|x| x.id)
.collect::<Vec<_>>();

log::debug!("Dropping blob files: {ids:?}");
log::info!("Dropping stale blob files: {ids:?}");
self.manifest.drop_segments(&ids)?;

Ok(())
Expand Down Expand Up @@ -366,6 +359,7 @@ impl ValueLog {
&self,
iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
) -> crate::Result<()> {
#[derive(Debug)]
struct SegmentCounter {
size: u64,
item_count: u64,
Expand Down Expand Up @@ -399,6 +393,8 @@ impl ValueLog {
});
}

eprintln!("{size_map:?}");

for (&id, counter) in &size_map {
let used_size = counter.size;
let alive_item_count = counter.item_count;
Expand Down Expand Up @@ -487,7 +483,7 @@ impl ValueLog {
&self,
ids: &[u64],
index_reader: &R,
index_writer: W,
mut index_writer: W,
) -> crate::Result<()> {
if ids.is_empty() {
return Ok(());
Expand All @@ -514,8 +510,9 @@ impl ValueLog {

let reader = MergeReader::new(readers);

// IMPORTANT: Set separation size to 0 just in case
let mut writer = self.get_writer(index_writer)?.blob_separation_size(0);
let mut writer = self
.get_writer()?
.use_compression(self.config.compression.clone());

for item in reader {
let (k, v, segment_id) = item?;
Expand All @@ -527,13 +524,20 @@ impl ValueLog {
_ => {}
}

let vhandle = writer.get_next_value_handle(&k);
index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;

writer.write(&k, &v)?;
}

// IMPORTANT: New segments need to be persisted before adding to index
// to avoid dangling pointers
self.manifest.register(writer)?;

// NOTE: If we crash here, it's fine, the segments are registered
// but never referenced, so they can just be dropped after recovery
index_writer.finish()?;

// IMPORTANT: We only mark the segments as definitely stale
// The external index needs to decide when it is safe to drop
// the old segments, as some reads may still be performed
Expand Down

0 comments on commit b82c80f

Please sign in to comment.