Skip to content

Commit

Permalink
implement blob prefetch, closes #4
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jul 25, 2024
1 parent 576c843 commit 1a69e26
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 56 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ default = []
serde = ["dep:serde"]

[dependencies]
ahash = "0.8.11"
byteorder = "1.5.0"
crc32fast = "1.4.2"
log = "0.4.21"
lz4_flex = { version = "0.11.3", optional = true }
min-max-heap = "1.3.0"
miniz_oxide = { version = "0.7.3", optional = true }
path-absolutize = "3.1.1"
quick_cache = "0.6.0"
quick_cache = { version = "0.6.0", default-features = false, features = [
"ahash",
] }
rustc-hash = "2.0.0"
serde = { version = "1.0.200", optional = true, features = ["derive", "rc"] }
tempfile = "3.10.1"

Expand All @@ -43,4 +47,3 @@ test-log = "0.2.15"
name = "value_log"
harness = false
path = "benches/value_log.rs"
required-features = ["lz4"]
145 changes: 121 additions & 24 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,108 @@
use criterion::{criterion_group, criterion_main, Criterion};
use rand::RngCore;
use rand::{Rng, RngCore};
use std::sync::Arc;
use value_log::{BlobCache, Config, ExternalIndex, MockIndex, ValueHandle, ValueLog};
use value_log::{
BlobCache, Config, IndexReader, IndexWriter, MockIndex, MockIndexWriter, ValueLog,
};

fn hashmap(c: &mut Criterion) {
use ahash::HashMapExt;

let mut group = c.benchmark_group("hashmap");

group.bench_function("std".to_string(), |b| {
let mut s = std::collections::HashMap::new();
s.insert(4, "a");

b.iter(|| {
s.get(&4).unwrap();
})
});

group.bench_function("ahash".to_string(), |b| {
let mut a = ahash::HashMap::new();
a.insert(4, "a");

b.iter(|| {
a.get(&4).unwrap();
})
});
}

fn prefetch(c: &mut Criterion) {
let mut group = c.benchmark_group("prefetch range");

let range_size = 10;
let item_size = 1_024;

let index = MockIndex::default();
let mut index_writer = MockIndexWriter(index.clone());

let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::open(vl_path, Config::default()).unwrap();

let mut writer = value_log.get_writer().unwrap();

let mut rng = rand::thread_rng();

for key in (0u64..2_000_000).map(u64::to_be_bytes) {
let mut data = vec![0u8; item_size];
rng.fill_bytes(&mut data);

index_writer
.insert_indirect(&key, writer.get_next_value_handle(), data.len() as u32)
.unwrap();

writer.write(key, &data).unwrap();

data.clear();
}

value_log.register_writer(writer).unwrap();

let mut rng = rand::thread_rng();

group.bench_function(format!("{range_size}x{item_size}B - no prefetch"), |b| {
b.iter(|| {
let start = rng.gen_range(0u64..1_999_000);

for x in start..(start + range_size) {
let handle = index.get(&x.to_be_bytes()).unwrap().unwrap();

let value = value_log.get(&handle).unwrap().unwrap();

assert_eq!(item_size, value.len());
}
})
});

group.bench_function(format!("{range_size}x{item_size}B - with prefetch"), |b| {
b.iter(|| {
let start = rng.gen_range(0u64..1_999_000);

{
let handle = index.get(&start.to_be_bytes()).unwrap().unwrap();

let value = value_log
.get_with_prefetch(&handle, (range_size - 1) as usize)
.unwrap()
.unwrap();

assert_eq!(item_size, value.len());
}

for x in (start..(start + range_size)).skip(1) {
let handle = index.get(&x.to_be_bytes()).unwrap().unwrap();

let value = value_log.get(&handle).unwrap().unwrap();

assert_eq!(item_size, value.len());
}
})
});
}

fn load_value(c: &mut Criterion) {
let mut group = c.benchmark_group("load blob");
Expand All @@ -22,6 +123,7 @@ fn load_value(c: &mut Criterion) {

{
let index = MockIndex::default();
let mut index_writer = MockIndexWriter(index.clone());

let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();
Expand All @@ -33,21 +135,19 @@ fn load_value(c: &mut Criterion) {
.unwrap();

let mut writer = value_log.get_writer().unwrap();
let segment_id = writer.segment_id();

let mut rng = rand::thread_rng();

for size in sizes {
let key = size.to_string();
let offset = writer.offset(key.as_bytes());

let mut data = vec![0u8; size];
rng.fill_bytes(&mut data);

index
.insert_indirection(
index_writer
.insert_indirect(
key.as_bytes(),
ValueHandle { offset, segment_id },
writer.get_next_value_handle(),
data.len() as u32,
)
.unwrap();
Expand All @@ -71,6 +171,7 @@ fn load_value(c: &mut Criterion) {

{
let index = MockIndex::default();
let mut index_writer = MockIndexWriter(index.clone());

let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();
Expand All @@ -83,21 +184,19 @@ fn load_value(c: &mut Criterion) {
.unwrap();

let mut writer = value_log.get_writer().unwrap();
let segment_id = writer.segment_id();

let mut rng = rand::thread_rng();

for size in sizes {
let key = size.to_string();
let offset = writer.offset(key.as_bytes());

let mut data = vec![0u8; size];
rng.fill_bytes(&mut data);

index
.insert_indirection(
index_writer
.insert_indirect(
key.as_bytes(),
ValueHandle { offset, segment_id },
writer.get_next_value_handle(),
data.len() as u32,
)
.unwrap();
Expand All @@ -123,10 +222,11 @@ fn load_value(c: &mut Criterion) {
}
}

fn compression(c: &mut Criterion) {
/* fn compression(c: &mut Criterion) {
let mut group = c.benchmark_group("compression");
let index = MockIndex::default();
let mut index_writer = MockIndexWriter(index.clone());
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();
Expand All @@ -138,23 +238,21 @@ fn compression(c: &mut Criterion) {
.unwrap();
let mut writer = value_log.get_writer().unwrap();
let segment_id = writer.segment_id();
let mut rng = rand::thread_rng();
let size_mb = 16;
{
let key = "random";
let offset = writer.offset(key.as_bytes());
let mut data = vec![0u8; size_mb * 1_024 * 1_024];
rng.fill_bytes(&mut data);
index
.insert_indirection(
index_writer
.insert_indirect(
key.as_bytes(),
ValueHandle { offset, segment_id },
writer.get_next_value_handle(),
data.len() as u32,
)
.unwrap();
Expand All @@ -164,15 +262,14 @@ fn compression(c: &mut Criterion) {
{
let key = "good_compression";
let offset = writer.offset(key.as_bytes());
let dummy = b"abcdefgh";
let data = dummy.repeat(size_mb * 1_024 * 1_024 / dummy.len());
index
.insert_indirection(
index_writer
.insert_indirect(
key.as_bytes(),
ValueHandle { offset, segment_id },
writer.get_next_value_handle(),
data.len() as u32,
)
.unwrap();
Expand All @@ -196,7 +293,7 @@ fn compression(c: &mut Criterion) {
value_log.get(&handle_good_compression).unwrap().unwrap();
})
});
}
} */

criterion_group!(benches, load_value, compression);
criterion_group!(benches, hashmap, load_value, prefetch /* , compression */);
criterion_main!(benches);
5 changes: 4 additions & 1 deletion src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use crate::{
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
collections::HashMap,
io::{Cursor, Write},
path::{Path, PathBuf},
sync::{Arc, RwLock},
};

type HashMap<K, V> = ahash::HashMap<K, V>;

pub const VLOG_MARKER: &str = ".vlog";
pub const SEGMENTS_FOLDER: &str = "segments";
const MANIFEST_FILE: &str = "vlog_manifest";
Expand Down Expand Up @@ -113,6 +114,8 @@ impl SegmentManifest {
Self::remove_unfinished_segments(&segments_folder, &ids)?;

let segments = {
use ahash::HashMapExt;

let mut map = HashMap::with_capacity(100);

for id in ids {
Expand Down
3 changes: 2 additions & 1 deletion src/segment/multi_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ impl MultiWriter {
}
}

#[doc(hidden)]
#[must_use]
fn offset(&self) -> u64 {
pub fn offset(&self) -> u64 {
self.get_active_writer().offset()
}

Expand Down
21 changes: 15 additions & 6 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::id::SegmentId;
use byteorder::{BigEndian, ReadBytesExt};
use std::{
fs::File,
io::{BufReader, Read},
path::PathBuf,
io::{BufReader, Read, Seek},
path::Path,
sync::Arc,
};

Expand All @@ -21,15 +21,24 @@ impl Reader {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn new<P: Into<PathBuf>>(path: P, segment_id: SegmentId) -> crate::Result<Self> {
let path = path.into();
pub fn new<P: AsRef<Path>>(path: P, segment_id: SegmentId) -> crate::Result<Self> {
let file_reader = BufReader::new(File::open(path)?);

Ok(Self {
Ok(Self::with_reader(segment_id, file_reader))
}

pub(crate) fn get_offset(&mut self) -> std::io::Result<u64> {
self.inner.stream_position()
}

/// Initializes a new segment reader.
#[must_use]
pub fn with_reader(segment_id: SegmentId, file_reader: BufReader<File>) -> Self {
Self {
segment_id,
inner: file_reader,
is_terminated: false,
})
}
}
}

Expand Down
Loading

0 comments on commit 1a69e26

Please sign in to comment.