From 755fe80fbb291fc8a7f6b6146c77ca4d3106aacc Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 13 Mar 2024 22:27:50 +0100 Subject: [PATCH] add stat calculations --- Cargo.toml | 10 +-- src/handle.rs | 4 +- src/main.rs | 2 +- src/segment/mod.rs | 36 ++++++-- src/segment/multi_writer.rs | 12 +-- src/segment/stats.rs | 38 ++++++++ src/segment/writer.rs | 11 ++- src/value_log.rs | 175 ++++++++++++++++++++++++++++++++---- tests/basic_gc.rs | 41 +++------ tests/basic_kv.rs | 14 ++- tests/space_amp.rs | 129 ++++++++++++++++++++++++++ 11 files changed, 390 insertions(+), 82 deletions(-) create mode 100644 src/segment/stats.rs create mode 100644 tests/space_amp.rs diff --git a/Cargo.toml b/Cargo.toml index 23c0ffc..1a96695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,9 +19,8 @@ name = "value_log" path = "src/lib.rs" [features] -default = ["compression"] -compression = ["dep:lz4_flex"] -serde = ["dep:serde"] +default = ["lz4"] +lz4 = ["dep:lz4_flex"] [dependencies] byteorder = "1.5.0" @@ -36,7 +35,8 @@ serde = { version = "1.0.197", default-features = false, features = [ "alloc", "rc", "derive", -], optional = true } +] } +serde_json = { version = "1.0.114" } [dev-dependencies] criterion = "0.5.1" @@ -48,4 +48,4 @@ test-log = "0.2.15" name = "value_log" harness = false path = "benches/value_log.rs" -required-features = ["compression"] +required-features = ["lz4"] diff --git a/src/handle.rs b/src/handle.rs index b9f7411..851f3ef 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -1,10 +1,10 @@ +use serde::{Deserialize, Serialize}; use std::hash::Hash; use std::sync::Arc; /// A value handle points into the value log. #[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] pub struct ValueHandle { /// Segment ID pub segment_id: Arc, diff --git a/src/main.rs b/src/main.rs index 6eb91d1..37c8107 100644 --- a/src/main.rs +++ b/src/main.rs @@ -251,7 +251,7 @@ fn main() -> value_log::Result<()> { for _ in 0..10 { let value_handle = ValueHandle { - segment_id: value_log.list_segments().first().unwrap().clone(), + segment_id: value_log.list_segment_ids().first().unwrap().clone(), offset: 3, }; diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 3381e3c..a2e9a08 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -1,11 +1,10 @@ -use std::{ - path::PathBuf, - sync::{atomic::AtomicU64, Arc}, -}; +use self::stats::Stats; +use std::{path::PathBuf, sync::Arc}; pub mod merge; pub mod multi_writer; pub mod reader; +pub mod stats; pub mod writer; /// A disk segment is an immutable, sorted, contiguous file @@ -21,12 +20,31 @@ pub struct Segment { /// Segment ID pub id: Arc, - /// asdasd + /// Segment path (folder) pub path: PathBuf, - /// asdasd - pub item_count: u64, + /// Segment statistics + pub stats: Stats, +} + +impl Segment { + /// Returns a scanner that can iterate through the segment + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn scan(&self) -> std::io::Result { + let path = self.path.join("data"); + reader::Reader::new(path, self.id.clone()) + } + + /// Always returns `false` + pub fn is_empty(&self) -> bool { + false + } - /// asdasd - pub stale_values: AtomicU64, + /// Returns the amount of items (dead or alive) in the segment + pub fn len(&self) -> u64 { + self.stats.item_count + } } diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index e7dc51e..6e2737a 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -7,7 +7,7 @@ use std::{ /// Segment writer, may write multiple segments pub struct MultiWriter { - folder: PathBuf, + root_folder: PathBuf, target_size: u64, writers: Vec, } @@ -22,12 +22,10 @@ impl MultiWriter { pub fn new>(target_size: u64, folder: P) -> std::io::Result { let folder = folder.as_ref(); let segment_id = generate_segment_id(); - - let segment_folder = folder.join("segments").join(&*segment_id); - let path = segment_folder.join("data"); + let path = folder.join("segments").join(&*segment_id).join("data"); Ok(Self { - folder: folder.into(), + root_folder: folder.into(), target_size, writers: vec![Writer::new(segment_id, path)?], }) @@ -63,11 +61,13 @@ impl MultiWriter { log::debug!("Rotating segment writer"); let new_segment_id = generate_segment_id(); + let path = self - .folder + .root_folder .join("segments") .join(&*new_segment_id) .join("data"); + self.writers.push(Writer::new(new_segment_id, path)?); Ok(()) diff --git a/src/segment/stats.rs b/src/segment/stats.rs new file mode 100644 index 0000000..9a59603 --- /dev/null +++ b/src/segment/stats.rs @@ -0,0 +1,38 @@ +use serde::{Deserialize, Serialize}; +use std::sync::atomic::AtomicU64; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Stats { + pub(crate) item_count: u64, + pub(crate) dead_items: AtomicU64, + + pub total_bytes: u64, + pub(crate) dead_bytes: AtomicU64, + // TODO: key range +} + +impl Stats { + /// Returns the percent of dead items in the segment + pub fn dead_ratio(&self) -> f32 { + let dead = self.get_dead_items() as f32; + if dead == 0.0 { + return 0.0; + } + + dead / self.item_count as f32 + } + + /// Returns the amount of dead items in the segment + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + pub fn get_dead_items(&self) -> u64 { + self.dead_items.load(std::sync::atomic::Ordering::Acquire) + } + + /// Returns the amount of dead bytes in the segment + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + pub fn get_dead_bytes(&self) -> u64 { + self.dead_bytes.load(std::sync::atomic::Ordering::Acquire) + } +} diff --git a/src/segment/writer.rs b/src/segment/writer.rs index 72822fb..3d2961f 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -8,13 +8,15 @@ use std::{ /// Segment writer pub struct Writer { - pub(crate) path: PathBuf, + pub(crate) folder: PathBuf, pub(crate) segment_id: Arc, inner: BufWriter, offset: u64, pub(crate) item_count: u64, + + pub(crate) written_blob_bytes: u64, } impl Writer { @@ -32,11 +34,12 @@ impl Writer { let file = File::create(path)?; Ok(Self { - path: path.to_owned(), + folder: folder.into(), segment_id, inner: BufWriter::new(file), offset: 0, item_count: 0, + written_blob_bytes: 0, }) } @@ -81,7 +84,7 @@ impl Writer { /// /// Will return `Err` if an IO error occurs. pub fn write(&mut self, key: &[u8], value: &[u8]) -> std::io::Result { - #[cfg(feature = "compression")] + #[cfg(feature = "lz4")] let value = lz4_flex::compress_prepend_size(value); let mut hasher = crc32fast::Hasher::new(); @@ -94,6 +97,8 @@ impl Writer { self.inner.write_u32::(value.len() as u32)?; self.inner.write_all(&value)?; + self.written_blob_bytes += value.len() as u64; + // Key self.offset += std::mem::size_of::() as u64; self.offset += key.len() as u64; diff --git a/src/value_log.rs b/src/value_log.rs index bd560fb..bf27150 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -1,7 +1,7 @@ use crate::{ blob_cache::BlobCache, index::Writer as IndexWriter, - segment::{merge::MergeReader, multi_writer::MultiWriter}, + segment::{merge::MergeReader, multi_writer::MultiWriter, stats::Stats}, version::Version, Config, Index, Segment, SegmentReader, SegmentWriter, ValueHandle, }; @@ -101,9 +101,19 @@ impl ValueLog { }))) } + /// Gets a segment + #[must_use] + pub fn get_segment(&self, id: &Arc) -> Option> { + self.segments + .read() + .expect("lock is poisoned") + .get(id) + .cloned() + } + /// Lists all segment IDs #[must_use] - pub fn list_segments(&self) -> Vec> { + pub fn list_segment_ids(&self) -> Vec> { self.segments .read() .expect("lock is poisoned") @@ -112,6 +122,17 @@ impl ValueLog { .collect() } + /// Lists all segments + #[must_use] + pub fn list_segments(&self) -> Vec> { + self.segments + .read() + .expect("lock is poisoned") + .values() + .cloned() + .collect() + } + pub(crate) fn recover>( path: P, _index: Arc, @@ -154,7 +175,7 @@ impl ValueLog { return Ok(Some(value)); } - let mut reader = BufReader::new(File::open(&segment.path)?); + let mut reader = BufReader::new(File::open(segment.path.join("data"))?); reader.seek(std::io::SeekFrom::Start(handle.offset))?; let _crc = reader.read_u32::()?; @@ -164,7 +185,7 @@ impl ValueLog { let mut val = vec![0; val_len as usize]; reader.read_exact(&mut val)?; - #[cfg(feature = "compression")] + #[cfg(feature = "lz4")] let val = lz4_flex::decompress_size_prepended(&val).expect("should decompress"); // TODO: handle CRC @@ -183,14 +204,6 @@ impl ValueLog { handles.iter().map(|vr| self.get(vr)).collect() } */ - /// Sets the eligible-for-GC item count for a specific segment - pub fn set_stale_items(&self, id: &str, cnt: u64) { - if let Some(item) = self.segments.read().expect("lock is poisoned").get(id) { - item.stale_values - .store(cnt, std::sync::atomic::Ordering::Release); - }; - } - /// Initializes a new segment writer /// /// # Errors @@ -215,16 +228,19 @@ impl ValueLog { for writer in writers { let segment_id = writer.segment_id.clone(); - let path = writer.path.clone(); - let item_count = writer.item_count; + let segment_folder = writer.folder.clone(); lock.insert( segment_id.clone(), Arc::new(Segment { id: segment_id, - path, - item_count, - stale_values: AtomicU64::default(), + path: segment_folder, + stats: Stats { + item_count: writer.item_count, + total_bytes: writer.written_blob_bytes, + dead_items: AtomicU64::default(), + dead_bytes: AtomicU64::default(), + }, }), ); } @@ -232,6 +248,129 @@ impl ValueLog { Ok(()) } + /// Returns the amount of bytes that can be freed on disk + /// if all segments were to be defragmented + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn reclaimable_bytes(&self) -> u64 { + let segments = self.segments.read().expect("lock is poisoned"); + + let dead_bytes = segments + .values() + .map(|x| x.stats.get_dead_bytes()) + .sum::(); + drop(segments); + + dead_bytes + } + + /// Returns the percent of dead bytes in the value log + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn dead_ratio(&self) -> f32 { + let segments = self.segments.read().expect("lock is poisoned"); + + let used_bytes = segments.values().map(|x| x.stats.total_bytes).sum::(); + if used_bytes == 0 { + return 0.0; + } + + let dead_bytes = segments + .values() + .map(|x| x.stats.get_dead_bytes()) + .sum::(); + if dead_bytes == 0 { + return 0.0; + } + + drop(segments); + + dead_bytes as f32 / used_bytes as f32 + } + + /// Returns the approximate space amplification + /// + /// Returns 0.0 if there are no items. + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn space_amp(&self) -> f32 { + let segments = self.segments.read().expect("lock is poisoned"); + + let used_bytes = segments.values().map(|x| x.stats.total_bytes).sum::(); + if used_bytes == 0 { + return 0.0; + } + + let dead_bytes = segments + .values() + .map(|x| x.stats.get_dead_bytes()) + .sum::(); + + drop(segments); + + let alive_bytes = used_bytes - dead_bytes; + if alive_bytes == 0 { + return 0.0; + } + + used_bytes as f32 / alive_bytes as f32 + } + + /// Scans through a segment, refreshing its statistics + /// + /// This function is blocking. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn refresh_stats(&self, segment_id: &Arc) -> std::io::Result<()> { + let Some(segment) = self + .segments + .read() + .expect("lock is poisoned") + .get(segment_id) + .cloned() + else { + return Ok(()); + }; + + // Scan segment + let scanner = segment.scan()?; + + let mut dead_items = 0; + let mut dead_bytes = 0; + + for item in scanner { + let (key, val) = item?; + + if let Some(item) = self.index.get(&key)? { + // NOTE: Segment IDs are monotonically increasing + if item.segment_id > *segment_id { + dead_items += 1; + dead_bytes += val.len() as u64; + } + } else { + dead_items += 1; + dead_bytes += val.len() as u64; + } + } + + segment + .stats + .dead_items + .store(dead_items, std::sync::atomic::Ordering::Release); + + segment + .stats + .dead_bytes + .store(dead_bytes, std::sync::atomic::Ordering::Release); + + Ok(()) + } + /// Rewrites some segments into new segment(s), blocking the caller /// until the operation is completely done. /// @@ -261,7 +400,7 @@ impl ValueLog { let readers = segments .into_iter() - .map(|x| SegmentReader::new(&x.path, x.id.clone())) + .map(|x| x.scan()) .collect::>>()?; let reader = MergeReader::new(readers); diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index a077296..08c312a 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -2,6 +2,7 @@ use std::{ collections::BTreeMap, sync::{Arc, RwLock}, }; +use test_log::test; use value_log::{Config, Index, IndexWriter, ValueHandle, ValueLog}; type Inner = RwLock, ValueHandle>>; @@ -43,7 +44,7 @@ impl IndexWriter for DebugIndexWriter { } #[test] -fn basic_kv() -> value_log::Result<()> { +fn basic_gc() -> value_log::Result<()> { let folder = tempfile::tempdir()?; let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); @@ -80,15 +81,8 @@ fn basic_kv() -> value_log::Result<()> { { let lock = value_log.segments.read().unwrap(); assert_eq!(1, lock.len()); - assert_eq!(5, lock.values().next().unwrap().item_count); - assert_eq!( - 0, - lock.values() - .next() - .unwrap() - .stale_values - .load(std::sync::atomic::Ordering::Relaxed), - ); + assert_eq!(5, lock.values().next().unwrap().len()); + assert_eq!(0, lock.values().next().unwrap().stats.get_dead_items()); } for (key, handle) in index.0.read().unwrap().iter() { @@ -123,15 +117,8 @@ fn basic_kv() -> value_log::Result<()> { { let lock = value_log.segments.read().unwrap(); assert_eq!(2, lock.len()); - assert_eq!(5, lock.values().next().unwrap().item_count); - assert_eq!( - 0, - lock.values() - .next() - .unwrap() - .stale_values - .load(std::sync::atomic::Ordering::Relaxed), - ); + assert_eq!(5, lock.values().next().unwrap().len()); + assert_eq!(0, lock.values().next().unwrap().stats.get_dead_items()); } for (key, handle) in index.0.read().unwrap().iter() { @@ -139,20 +126,16 @@ fn basic_kv() -> value_log::Result<()> { assert_eq!(item, key.repeat(1_000).into()); } - value_log.rollover(&value_log.list_segments(), &DebugIndexWriter(index.clone()))?; + value_log.rollover( + &value_log.list_segment_ids(), + &DebugIndexWriter(index.clone()), + )?; { let lock = value_log.segments.read().unwrap(); assert_eq!(1, lock.len()); - assert_eq!(5, lock.values().next().unwrap().item_count); - assert_eq!( - 0, - lock.values() - .next() - .unwrap() - .stale_values - .load(std::sync::atomic::Ordering::Relaxed), - ); + assert_eq!(5, lock.values().next().unwrap().len()); + assert_eq!(0, lock.values().next().unwrap().stats.get_dead_items()); } Ok(()) diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 0266d02..ffdc41b 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -2,6 +2,7 @@ use std::{ collections::BTreeMap, sync::{Arc, RwLock}, }; +use test_log::test; use value_log::{Config, Index, ValueHandle, ValueLog}; type Inner = RwLock, ValueHandle>>; @@ -69,15 +70,10 @@ fn basic_kv() -> value_log::Result<()> { { let lock = value_log.segments.read().unwrap(); assert_eq!(1, lock.len()); - assert_eq!(items.len() as u64, lock.values().next().unwrap().item_count); - assert_eq!( - 0, - lock.values() - .next() - .unwrap() - .stale_values - .load(std::sync::atomic::Ordering::Relaxed), - ); + + let segment = lock.values().next().unwrap(); + assert_eq!(items.len() as u64, segment.len()); + assert_eq!(0, segment.stats.get_dead_items()); } for (key, handle) in index.0.read().unwrap().iter() { diff --git a/tests/space_amp.rs b/tests/space_amp.rs new file mode 100644 index 0000000..e08a447 --- /dev/null +++ b/tests/space_amp.rs @@ -0,0 +1,129 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; +use test_log::test; +use value_log::{Config, Index, ValueHandle, ValueLog}; + +type Inner = RwLock, ValueHandle>>; + +#[derive(Default)] +pub struct DebugIndex(Inner); + +impl std::ops::Deref for DebugIndex { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.read().expect("lock is poisoned").get(key).cloned()) + } + + fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { + self.write() + .expect("lock is poisoned") + .insert(key.into(), value); + + Ok(()) + } +} + +#[test] +fn worst_case_space_amp() -> value_log::Result<()> { + let folder = tempfile::tempdir()?; + + let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); + let index = Arc::new(index); + + let vl_path = folder.path(); + std::fs::create_dir_all(vl_path)?; + let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + + assert_eq!(0.0, value_log.space_amp()); + assert_eq!(0.0, value_log.dead_ratio()); + + let key = "key"; + let value = "value"; + + // NOTE: Write a single item 10x + // -> should result in space amp = 10.0x + for x in 1..=10 { + let mut writer = value_log.get_writer()?; + let segment_id = writer.segment_id(); + + let offset = writer.offset(key.as_bytes()); + + index.insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + )?; + + writer.write(key.as_bytes(), value.as_bytes())?; + value_log.register(writer)?; + + for id in value_log.list_segment_ids() { + value_log.refresh_stats(&id)?; + } + + assert_eq!(x as f32, value_log.space_amp()); + + if x > 1 { + assert!((1.0 - (1.0 / x as f32) - value_log.dead_ratio()) < 0.00001); + } + } + + Ok(()) +} + +#[test] +fn no_overlap_space_amp() -> value_log::Result<()> { + let folder = tempfile::tempdir()?; + + let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); + let index = Arc::new(index); + + let vl_path = folder.path(); + std::fs::create_dir_all(vl_path)?; + let value_log = ValueLog::new(vl_path, Config::default(), index.clone())?; + + assert_eq!(0.0, value_log.dead_ratio()); + assert_eq!(0.0, value_log.space_amp()); + + // NOTE: No blobs overlap, so there are no dead blobs => space amp = 1.0 => perfect space amp + for i in 0..100 { + let key = i.to_string(); + let value = "afsasfdfasdfsda"; + + let mut writer = value_log.get_writer()?; + let segment_id = writer.segment_id(); + + let offset = writer.offset(key.as_bytes()); + + index.insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + )?; + + writer.write(key.as_bytes(), value.as_bytes())?; + value_log.register(writer)?; + + for id in value_log.list_segment_ids() { + value_log.refresh_stats(&id)?; + } + + assert_eq!(1.0, value_log.space_amp()); + assert_eq!(0.0, value_log.dead_ratio()); + } + + Ok(()) +}