Skip to content

Commit

Permalink
stash, todo: rebase me
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx committed Aug 21, 2024
1 parent 744cd9e commit 93f2632
Show file tree
Hide file tree
Showing 17 changed files with 1,591 additions and 33 deletions.
1 change: 0 additions & 1 deletion foyer-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ readme = "../README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bitvec = "1"
bytes = "1"
cfg-if = "1"
crossbeam = "0.8"
Expand Down
2 changes: 0 additions & 2 deletions foyer-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub mod bits;
pub mod buf;
/// The trait for the key and value encoding and decoding.
pub mod code;
/// Bloom filters.
pub mod compact_bloom_filter;
/// A concurrent count down util.
pub mod countdown;
/// Components for monitoring internal events.
Expand Down
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ itertools = { workspace = true }
lazy_static = "1"
libc = "0.2"
lz4 = "1.24"
ordered_hash_map = "0.4"
parking_lot = { version = "0.12", features = ["arc_lock"] }
pin-project = "1"
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/large/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
value_len: info.value_len as _,
hash: entry.hash(),
sequence,
checksum: Checksummer::checksum(&buffer),
checksum: Checksummer::checksum64(&buffer),
compression: self.compression,
};

Expand Down
12 changes: 9 additions & 3 deletions foyer-storage/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::{fmt::Debug, hash::Hasher};
use twox_hash::XxHash64;
use twox_hash::{XxHash32, XxHash64};

use foyer_common::code::{StorageKey, StorageValue};

Expand All @@ -27,11 +27,17 @@ use crate::{
pub struct Checksummer;

impl Checksummer {
pub fn checksum(buf: &[u8]) -> u64 {
pub fn checksum64(buf: &[u8]) -> u64 {
let mut hasher = XxHash64::with_seed(0);
hasher.write(buf);
hasher.finish()
}

pub fn checksum32(buf: &[u8]) -> u32 {
let mut hasher = XxHash32::with_seed(0);
hasher.write(buf);
hasher.finish() as u32
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -115,7 +121,7 @@ impl EntryDeserializer {

// calculate checksum if needed
if let Some(expected) = checksum {
let get = Checksummer::checksum(&buffer[..value_len + ken_len]);
let get = Checksummer::checksum64(&buffer[..value_len + ken_len]);
if expected != get {
return Err(Error::ChecksumMismatch { expected, get });
}
Expand Down
289 changes: 289 additions & 0 deletions foyer-storage/src/small/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
collections::{hash_map::HashMap, HashSet},
fmt::Debug,
};

use foyer_common::code::{HashBuilder, StorageKey, StorageValue};
use foyer_memory::CacheEntry;
use tokio::sync::oneshot;

use crate::{serde::KvInfo, small::serde::EntryHeader, IoBytes, IoBytesMut};

use super::set::SetId;

struct Insertion<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
entry: CacheEntry<K, V, S>,
buffer: IoBytes,
info: KvInfo,
}

struct Deletion {
hash: u64,
}

struct Entry<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
insertion: Insertion<K, V, S>,
prev_hash: Option<u64>,
next_hash: Option<u64>,
}

struct SetBatchMut<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
entries: HashMap<u64, Entry<K, V, S>>,
deletions: HashMap<u64, Deletion>,

head_hash: Option<u64>,
tail_hash: Option<u64>,

len: usize,
capacity: usize,
}

impl<K, V, S> Debug for SetBatchMut<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SetMut")
.field("count", &self.entries.len())
.field("len", &self.len)
.finish()
}
}

impl<K, V, S> SetBatchMut<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
fn new(capacity: usize) -> Self {
Self {
entries: HashMap::new(),
deletions: HashMap::new(),
head_hash: None,
tail_hash: None,
len: 0,
capacity,
}
}

fn insert(&mut self, insertion: Insertion<K, V, S>) {
self.deletions.remove(&insertion.entry.hash());

let mut entry = Entry {
insertion,
prev_hash: None,
next_hash: None,
};

self.list_push(&mut entry);
self.len += EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len();
if let Some(mut old) = self.entries.insert(entry.insertion.entry.hash(), entry) {
self.list_unlink(&mut old);
self.len -= EntryHeader::ENTRY_HEADER_SIZE + old.insertion.buffer.len();
}

while self.len > self.capacity {
let entry = self.pop().unwrap();
self.len -= EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len();
}
}

fn delete(&mut self, deletion: Deletion) {
if let Some(mut entry) = self.entries.remove(&deletion.hash) {
self.list_unlink(&mut entry);
self.len -= EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len();
}

self.deletions.insert(deletion.hash, deletion);
}

fn freeze(mut self) -> SetBatch<K, V, S> {
let mut buf = IoBytesMut::with_capacity(self.len);
let mut entries = Vec::with_capacity(self.entries.len());
let mut deletions = HashSet::with_capacity(self.entries.len() + self.deletions.len());
let mut insertions = Vec::with_capacity(self.entries.len());

while let Some(entry) = self.pop() {
let header = EntryHeader::new(
entry.insertion.entry.hash(),
entry.insertion.info.key_len,
entry.insertion.info.value_len,
);
header.write(&mut buf);
deletions.insert(entry.insertion.entry.hash());
insertions.push(entry.insertion.entry.hash());
buf.copy_from_slice(&entry.insertion.buffer);
entries.push(entry.insertion.entry);
}

for deletion in self.deletions.into_values() {
deletions.insert(deletion.hash);
}

assert_eq!(buf.len(), self.len);

SetBatch {
deletions,
insertions,
bytes: buf.freeze(),
entries,
}
}

fn list_unlink(&mut self, entry: &mut Entry<K, V, S>) {
if let Some(prev_hash) = entry.prev_hash {
self.entries.get_mut(&prev_hash).unwrap().next_hash = entry.next_hash;
} else {
assert_eq!(self.head_hash, Some(entry.insertion.entry.hash()));
self.head_hash = entry.next_hash;
}
if let Some(next_hash) = entry.next_hash {
self.entries.get_mut(&next_hash).unwrap().prev_hash = entry.prev_hash;
} else {
assert_eq!(self.tail_hash, Some(entry.insertion.entry.hash()));
self.tail_hash = entry.prev_hash;
}
entry.prev_hash = None;
entry.next_hash = None;
}

fn list_push(&mut self, entry: &mut Entry<K, V, S>) {
assert!(entry.prev_hash.is_none());
assert!(entry.next_hash.is_none());

if let Some(tail_hash) = self.tail_hash {
let tail = self.entries.get_mut(&tail_hash).unwrap();

tail.next_hash = Some(entry.insertion.entry.hash());
entry.prev_hash = Some(tail_hash);

self.tail_hash = Some(entry.insertion.entry.hash());
} else {
assert!(self.head_hash.is_none());

self.head_hash = Some(entry.insertion.entry.hash());
self.tail_hash = Some(entry.insertion.entry.hash());
}
}

fn pop(&mut self) -> Option<Entry<K, V, S>> {
let head_hash = self.head_hash?;
let mut entry = self.entries.remove(&head_hash).unwrap();
self.list_unlink(&mut entry);
Some(entry)
}
}

#[derive(Debug)]
pub struct BatchMut<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
sets: HashMap<SetId, SetBatchMut<K, V, S>>,
/// Total set count.
total: SetId,
/// Set data capacity.
set_capacity: usize,

waiters: Vec<oneshot::Sender<()>>,
}

impl<K, V, S> BatchMut<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(total: SetId, set_data_size: usize) -> Self {
Self {
sets: HashMap::new(),
total,
set_capacity: set_data_size,
waiters: vec![],
}
}

pub fn append(&mut self, entry: CacheEntry<K, V, S>, buffer: IoBytes, info: KvInfo) {
let sid = entry.hash() % self.total;
let set = self.sets.entry(sid).or_insert(SetBatchMut::new(self.set_capacity));
let insertion = Insertion { entry, buffer, info };
set.insert(insertion);
}

pub fn delete(&mut self, hash: u64) {
let sid = hash % self.total;
let set = self.sets.entry(sid).or_insert(SetBatchMut::new(self.set_capacity));
set.delete(Deletion { hash })
}

pub fn rotate(&mut self) -> Batch<K, V, S> {
let sets = std::mem::take(&mut self.sets);
let sets = sets.into_iter().map(|(id, set)| (id, set.freeze())).collect();
let waiters = std::mem::take(&mut self.waiters);
Batch { sets, waiters }
}

/// Register a waiter to be notified after the batch is finished.
pub fn wait(&mut self) -> oneshot::Receiver<()> {
tracing::trace!("[batch]: register waiter");
let (tx, rx) = oneshot::channel();
self.waiters.push(tx);
rx
}
}

pub struct Batch<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub sets: HashMap<SetId, SetBatch<K, V, S>>,
pub waiters: Vec<oneshot::Sender<()>>,
}

pub struct SetBatch<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
pub deletions: HashSet<u64>,
pub insertions: Vec<u64>,
pub bytes: IoBytes,
pub entries: Vec<CacheEntry<K, V, S>>,
}
Loading

0 comments on commit 93f2632

Please sign in to comment.