Skip to content

Commit

Permalink
add stat calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Mar 13, 2024
1 parent e23fe15 commit 755fe80
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 82 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ name = "value_log"
path = "src/lib.rs"

[features]
default = ["compression"]
compression = ["dep:lz4_flex"]
serde = ["dep:serde"]
default = ["lz4"]
lz4 = ["dep:lz4_flex"]

[dependencies]
byteorder = "1.5.0"
Expand All @@ -36,7 +35,8 @@ serde = { version = "1.0.197", default-features = false, features = [
"alloc",
"rc",
"derive",
], optional = true }
] }
serde_json = { version = "1.0.114" }

[dev-dependencies]
criterion = "0.5.1"
Expand All @@ -48,4 +48,4 @@ test-log = "0.2.15"
name = "value_log"
harness = false
path = "benches/value_log.rs"
required-features = ["compression"]
required-features = ["lz4"]
4 changes: 2 additions & 2 deletions src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use std::sync::Arc;

/// A value handle points into the value log.
#[allow(clippy::module_name_repetitions)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct ValueHandle {
/// Segment ID
pub segment_id: Arc<str>,
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ fn main() -> value_log::Result<()> {

for _ in 0..10 {
let value_handle = ValueHandle {
segment_id: value_log.list_segments().first().unwrap().clone(),
segment_id: value_log.list_segment_ids().first().unwrap().clone(),
offset: 3,
};

Expand Down
36 changes: 27 additions & 9 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{
path::PathBuf,
sync::{atomic::AtomicU64, Arc},
};
use self::stats::Stats;
use std::{path::PathBuf, sync::Arc};

pub mod merge;
pub mod multi_writer;
pub mod reader;
pub mod stats;
pub mod writer;

/// A disk segment is an immutable, sorted, contiguous file
Expand All @@ -21,12 +20,31 @@ pub struct Segment {
/// Segment ID
pub id: Arc<str>,

/// asdasd
/// Segment path (folder)
pub path: PathBuf,

/// asdasd
pub item_count: u64,
/// Segment statistics
pub stats: Stats,
}

impl Segment {
/// Returns a scanner that can iterate through the segment
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn scan(&self) -> std::io::Result<reader::Reader> {
let path = self.path.join("data");
reader::Reader::new(path, self.id.clone())
}

/// Always returns `false`
pub fn is_empty(&self) -> bool {
false
}

/// asdasd
pub stale_values: AtomicU64,
/// Returns the amount of items (dead or alive) in the segment
pub fn len(&self) -> u64 {
self.stats.item_count
}
}
12 changes: 6 additions & 6 deletions src/segment/multi_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

/// Segment writer, may write multiple segments
pub struct MultiWriter {
folder: PathBuf,
root_folder: PathBuf,
target_size: u64,
writers: Vec<Writer>,
}
Expand All @@ -22,12 +22,10 @@ impl MultiWriter {
pub fn new<P: AsRef<Path>>(target_size: u64, folder: P) -> std::io::Result<Self> {
let folder = folder.as_ref();
let segment_id = generate_segment_id();

let segment_folder = folder.join("segments").join(&*segment_id);
let path = segment_folder.join("data");
let path = folder.join("segments").join(&*segment_id).join("data");

Ok(Self {
folder: folder.into(),
root_folder: folder.into(),
target_size,
writers: vec![Writer::new(segment_id, path)?],
})
Expand Down Expand Up @@ -63,11 +61,13 @@ impl MultiWriter {
log::debug!("Rotating segment writer");

let new_segment_id = generate_segment_id();

let path = self
.folder
.root_folder
.join("segments")
.join(&*new_segment_id)
.join("data");

self.writers.push(Writer::new(new_segment_id, path)?);

Ok(())
Expand Down
38 changes: 38 additions & 0 deletions src/segment/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use serde::{Deserialize, Serialize};
use std::sync::atomic::AtomicU64;

#[derive(Debug, Deserialize, Serialize)]
pub struct Stats {
pub(crate) item_count: u64,
pub(crate) dead_items: AtomicU64,

pub total_bytes: u64,
pub(crate) dead_bytes: AtomicU64,
// TODO: key range
}

impl Stats {
/// Returns the percent of dead items in the segment
pub fn dead_ratio(&self) -> f32 {
let dead = self.get_dead_items() as f32;
if dead == 0.0 {
return 0.0;
}

dead / self.item_count as f32
}

/// Returns the amount of dead items in the segment
///
/// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`].
pub fn get_dead_items(&self) -> u64 {
self.dead_items.load(std::sync::atomic::Ordering::Acquire)
}

/// Returns the amount of dead bytes in the segment
///
/// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`].
pub fn get_dead_bytes(&self) -> u64 {
self.dead_bytes.load(std::sync::atomic::Ordering::Acquire)
}
}
11 changes: 8 additions & 3 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use std::{

/// Segment writer
pub struct Writer {
pub(crate) path: PathBuf,
pub(crate) folder: PathBuf,
pub(crate) segment_id: Arc<str>,

inner: BufWriter<File>,

offset: u64,
pub(crate) item_count: u64,

pub(crate) written_blob_bytes: u64,
}

impl Writer {
Expand All @@ -32,11 +34,12 @@ impl Writer {
let file = File::create(path)?;

Ok(Self {
path: path.to_owned(),
folder: folder.into(),
segment_id,
inner: BufWriter::new(file),
offset: 0,
item_count: 0,
written_blob_bytes: 0,
})
}

Expand Down Expand Up @@ -81,7 +84,7 @@ impl Writer {
///
/// Will return `Err` if an IO error occurs.
pub fn write(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<u32> {
#[cfg(feature = "compression")]
#[cfg(feature = "lz4")]
let value = lz4_flex::compress_prepend_size(value);

let mut hasher = crc32fast::Hasher::new();
Expand All @@ -94,6 +97,8 @@ impl Writer {
self.inner.write_u32::<BigEndian>(value.len() as u32)?;
self.inner.write_all(&value)?;

self.written_blob_bytes += value.len() as u64;

// Key
self.offset += std::mem::size_of::<u16>() as u64;
self.offset += key.len() as u64;
Expand Down
Loading

0 comments on commit 755fe80

Please sign in to comment.