Skip to content

Commit

Permalink
stash, todo: rebase me
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx committed Aug 9, 2024
1 parent 6e973a6 commit 30012e3
Show file tree
Hide file tree
Showing 20 changed files with 1,395 additions and 101 deletions.
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ itertools = { workspace = true }
lazy_static = "1"
libc = "0.2"
lz4 = "1.24"
ordered_hash_map = "0.4"
parking_lot = { version = "0.12", features = ["arc_lock"] }
pin-project = "1"
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/device/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.use std::marker::PhantomData;
// limitations under the License.

use std::{
fmt::Debug,
Expand Down
12 changes: 0 additions & 12 deletions foyer-storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,6 @@ where
}
}

async fn wait(&self) -> Result<()> {
match self {
Engine::Noop(storage) => storage.wait().await,
Engine::Large(storage) => storage.wait().await,
Engine::LargeRuntime(storage) => storage.wait().await,
Engine::Small(storage) => storage.wait().await,
Engine::SmallRuntime(storage) => storage.wait().await,
Engine::Combined(storage) => storage.wait().await,
Engine::CombinedRuntime(storage) => storage.wait().await,
}
}

fn runtime(&self) -> &tokio::runtime::Handle {
match self {
Engine::Noop(storage) => storage.runtime(),
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/io_buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.use std::marker::PhantomData;
// limitations under the License.

use std::collections::VecDeque;

Expand Down
45 changes: 31 additions & 14 deletions foyer-storage/src/large/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ use futures::future::{try_join, try_join_all};
use parking_lot::Mutex;
use std::{
fmt::Debug,
sync::{atomic::Ordering, Arc},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{
runtime::Handle,
sync::{oneshot, Notify, Semaphore},
sync::{oneshot, Notify},
task::JoinHandle,
};

use super::{
Expand Down Expand Up @@ -82,10 +86,11 @@ where

notify: Arc<Notify>,

flight: Arc<Semaphore>,

compression: Compression,
metrics: Arc<Metrics>,

handle: Arc<Mutex<Option<JoinHandle<()>>>>,
stop: Arc<AtomicBool>,
}

impl<K, V, S> Clone for Flusher<K, V, S>
Expand All @@ -98,9 +103,10 @@ where
Self {
batch: self.batch.clone(),
notify: self.notify.clone(),
flight: self.flight.clone(),
compression: self.compression,
metrics: self.metrics.clone(),
handle: self.handle.clone(),
stop: self.stop.clone(),
}
}
}
Expand Down Expand Up @@ -133,7 +139,7 @@ where
indexer.clone(),
)));

let flight = Arc::new(Semaphore::new(1));
let stop = Arc::<AtomicBool>::default();

let runner = Runner {
batch: batch.clone(),
Expand All @@ -144,20 +150,23 @@ where
flush: config.flush,
stats,
metrics: metrics.clone(),
stop: stop.clone(),
};

runtime.spawn(async move {
let handle = runtime.spawn(async move {
if let Err(e) = runner.run().await {
tracing::error!("[flusher]: flusher exit with error: {e}");
tracing::error!("[lodc flusher]: flusher exit with error: {e}");
}
});
let handle = Arc::new(Mutex::new(Some(handle)));

Ok(Self {
batch,
notify,
flight,
compression: config.compression,
metrics,
handle,
stop,
})
}

Expand All @@ -176,9 +185,13 @@ where
self.notify.notify_one();
}

pub async fn wait(&self) -> Result<()> {
// TODO(MrCroxx): Consider a better implementation?
let _permit = self.flight.acquire().await;
pub async fn close(&self) -> Result<()> {
self.stop.store(true, Ordering::SeqCst);
self.notify.notify_one();
let handle = self.handle.lock().take();
if let Some(handle) = handle {
handle.await.unwrap();
}
Ok(())
}

Expand All @@ -195,7 +208,7 @@ where
value_len: info.value_len as _,
hash: entry.hash(),
sequence,
checksum: Checksummer::checksum(&buffer),
checksum: Checksummer::checksum64(&buffer),
compression: self.compression,
};

Expand Down Expand Up @@ -247,6 +260,8 @@ where

stats: Arc<Statistics>,
metrics: Arc<Metrics>,

stop: Arc<AtomicBool>,
}

impl<K, V, S> Runner<K, V, S>
Expand All @@ -256,12 +271,14 @@ where
S: HashBuilder + Debug,
{
pub async fn run(self) -> Result<()> {
// TODO(MrCroxx): Graceful shutdown.
loop {
let rotation = self.batch.lock().rotate();
let (batch, wait) = match rotation {
Some(rotation) => rotation,
None => {
if self.stop.load(Ordering::SeqCst) {
return Ok(());
}
self.notify.notified().await;
continue;
}
Expand Down
16 changes: 4 additions & 12 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.use std::marker::PhantomData;
// limitations under the License.

use std::{
borrow::Borrow,
Expand Down Expand Up @@ -287,15 +287,11 @@ where
})
}

async fn wait(&self) -> Result<()> {
try_join_all(self.inner.flushers.iter().map(|flusher| flusher.wait())).await?;
join_all(self.inner.reclaimers.iter().map(|reclaimer| reclaimer.wait())).await;
Ok(())
}

async fn close(&self) -> Result<()> {
self.inner.active.store(false, Ordering::Relaxed);
self.wait().await
try_join_all(self.inner.flushers.iter().map(|flusher| flusher.close())).await?;
join_all(self.inner.reclaimers.iter().map(|reclaimer| reclaimer.wait())).await;
Ok(())
}

#[fastrace::trace(name = "foyer::storage::large::generic::enqueue")]
Expand Down Expand Up @@ -528,10 +524,6 @@ where
self.inner.device.stat().clone()
}

async fn wait(&self) -> Result<()> {
self.wait().await
}

fn runtime(&self) -> &Handle {
self.runtime()
}
Expand Down
12 changes: 9 additions & 3 deletions foyer-storage/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::{fmt::Debug, hash::Hasher};
use twox_hash::XxHash64;
use twox_hash::{XxHash32, XxHash64};

use foyer_common::code::{StorageKey, StorageValue};

Expand All @@ -27,11 +27,17 @@ use crate::{
pub struct Checksummer;

impl Checksummer {
pub fn checksum(buf: &[u8]) -> u64 {
pub fn checksum64(buf: &[u8]) -> u64 {
let mut hasher = XxHash64::with_seed(0);
hasher.write(buf);
hasher.finish()
}

pub fn checksum32(buf: &[u8]) -> u32 {
let mut hasher = XxHash32::with_seed(0);
hasher.write(buf);
hasher.finish() as u32
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -113,7 +119,7 @@ impl EntryDeserializer {

// calculate checksum if needed
if let Some(expected) = checksum {
let get = Checksummer::checksum(&buffer[..value_len + ken_len]);
let get = Checksummer::checksum64(&buffer[..value_len + ken_len]);
if expected != get {
return Err(Error::ChecksumMismatch { expected, get });
}
Expand Down
Loading

0 comments on commit 30012e3

Please sign in to comment.