Skip to content

Commit

Permalink
init TruncateManager and TruncateWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
longfangsong committed Dec 2, 2021
1 parent 3b15b89 commit 987915d
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod snap;
pub mod status_server;
pub mod transport;
pub mod ttl;
mod truncate;

pub use self::config::{Config, ServerConfigManager, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR};
pub use self::errors::{Error, Result};
Expand Down
260 changes: 260 additions & 0 deletions src/server/truncate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use super::Result;
use engine_rocks::{ RocksEngineIterator, RocksWriteBatch};
use engine_traits::Iterable;
use engine_traits::{IterOptions, Iterator, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE};
use engine_traits::{Mutable, SeekKey};
use std::sync::{Arc, Mutex};
use txn_types::{Key, TimeStamp, Write, WriteRef};
use engine_traits::WriteBatch;
use engine_rocks::RocksEngine;
use engine_traits::WriteBatchExt;
use std::thread::JoinHandle;

const BATCH_SIZE: usize = 256;

#[derive(Debug, Clone)]
pub struct TruncateState {
// todo: estimated_total: usize,
// todo: delete_count: usize,
scan_count: usize,
done: bool,
}

pub struct TruncateWorker {
ts: TimeStamp,
write_iter: RocksEngineIterator,
state: Arc<Mutex<TruncateState>>,
}

impl TruncateWorker {
pub fn new(
mut write_iter: RocksEngineIterator,
ts: TimeStamp,
state: Arc<Mutex<TruncateState>>,
) -> Self {
state.lock().expect("failed to lock `state` in `TruncateWorker::new`").done = false;
write_iter.seek(SeekKey::Start).unwrap();
Self {
write_iter,
ts,
state,
}
}

fn next_write(&mut self) -> Result<Option<(Vec<u8>, Write)>> {
if self.write_iter.valid().unwrap() {
let mut state = self
.state
.lock()
.expect("failed to lock TruncateWorker::state");
state.scan_count += 1;
drop(state);
let write = box_try!(WriteRef::parse(self.write_iter.value())).to_owned();
let key = self.write_iter.key().to_vec();
self.write_iter.next().unwrap();
return Ok(Some((key, write)));
}
Ok(None)
}

fn scan_next_batch(&mut self, batch_size: usize) -> Result<(Vec<(Vec<u8>, Write)>, bool)> {
let mut writes = Vec::with_capacity(batch_size);
let mut has_more = true;
for _ in 0..batch_size {
if let Some((key, write)) = self.next_write()? {
let commit_ts = box_try!(Key::decode_ts_from(keys::origin_key(&key)));
if commit_ts > self.ts {
writes.push((key, write));
}
} else {
has_more = false;
break;
}
}
Ok((writes, has_more))
}

pub fn process_next_batch(
&mut self,
batch_size: usize,
wb: &mut RocksWriteBatch,
) -> Result<bool> {
let (writes, has_more) = self.scan_next_batch(batch_size)?;
for (key, write) in writes {
let default_key = Key::from_encoded_slice(&key).truncate_ts().unwrap().append_ts(write.start_ts);
box_try!(wb.delete_cf(CF_WRITE, &key));
box_try!(wb.delete_cf(CF_DEFAULT, default_key.as_encoded()));
}
wb.write().unwrap();
if !has_more {
self.state.lock().expect("failed to lock `TruncateWorker::state` in `TruncateWorker::process_next_batch`").done = true;
}
wb.clear();
Ok(has_more)
}
}


pub struct TruncateManager {
state: Arc<Mutex<TruncateState>>,
engine: RocksEngine,
worker_handle: Option<JoinHandle<()>>,
}

impl TruncateManager {
pub fn new(engine: RocksEngine) -> Self {
let state = Arc::new(Mutex::new(TruncateState {
scan_count: 0,
done: false
}));
TruncateManager {
state,
engine,
worker_handle: None,
}
}

pub fn start(&mut self, ts: TimeStamp) {
let readopts = IterOptions::new(None, None, false);
let write_iter = self.engine.iterator_cf_opt(CF_WRITE, readopts).unwrap();
let mut worker = TruncateWorker::new(write_iter, ts, self.state.clone());
let mut wb = self.engine.write_batch();
let props = tikv_util::thread_group::current_properties();
self.worker_handle = Some(std::thread::Builder::new()
.name("truncate".to_string())
.spawn(move || {
tikv_util::thread_group::set_properties(props);
tikv_alloc::add_thread_memory_accessor();

while worker.process_next_batch(BATCH_SIZE, &mut wb).expect("truncate failed") {
}

tikv_alloc::remove_thread_memory_accessor();
})
.expect("failed to spawn truncate thread"));
}

pub fn state(&self) -> TruncateState {
self.state.lock().expect("failed to lock `state` in `TruncateManager::state`").clone()
}

#[cfg(test)]
pub fn wait(&mut self) {
self.worker_handle.take().unwrap().join().unwrap();
}
}

#[cfg(test)]
mod tests {
use super::*;
use engine_rocks::raw::{ColumnFamilyOptions, DBOptions};
use engine_rocks::raw_util::CFOptions;
use engine_rocks::Compat;
use engine_traits::{WriteBatch, WriteBatchExt};
use tempfile::Builder;
use txn_types::WriteType;

#[test]
fn test_basic() {
enum Expect {
Keep,
Remove,
}

let tmp = Builder::new()
.prefix("test_basic")
.tempdir()
.unwrap();
let path = tmp.path().to_str().unwrap();
let fake_engine = Arc::new(
engine_rocks::raw_util::new_engine_opt(
path,
DBOptions::new(),
vec![
CFOptions::new(CF_DEFAULT, ColumnFamilyOptions::new()),
CFOptions::new(CF_WRITE, ColumnFamilyOptions::new()),
CFOptions::new(CF_LOCK, ColumnFamilyOptions::new()),
CFOptions::new(CF_RAFT, ColumnFamilyOptions::new()),
],
)
.unwrap(),
);

let write = vec![
// key, start_ts, commit_ts
(b"k", 104, 105, Expect::Remove),
(b"k", 102, 103, Expect::Remove),
(b"k", 100, 101, Expect::Keep),
(b"k", 98, 99, Expect::Keep),
];
let default = vec![
// key, start_ts
(b"k", 104, Expect::Remove),
(b"k", 102, Expect::Remove),
(b"k", 100, Expect::Keep),
(b"k", 98, Expect::Keep),
];
let mut kv = vec![];
for (key, start_ts, commit_ts, expect) in write {
let write = Write::new(WriteType::Put, start_ts.into(), None);
kv.push((
CF_WRITE,
Key::from_raw(key).append_ts(commit_ts.into()),
write.as_ref().to_bytes(),
expect,
));
}
for (key, ts, expect) in default {
kv.push((
CF_DEFAULT,
Key::from_raw(key).append_ts(ts.into()),
b"v".to_vec(),
expect,
));
}

let mut wb = fake_engine.c().write_batch();
for &(cf, ref k, ref v, _) in &kv {
wb.put_cf(cf, &keys::data_key(k.as_encoded()), v).unwrap();
}
wb.write().unwrap();

let mut manager = TruncateManager::new(fake_engine.c().clone());
manager.start(100.into());
manager.wait();

let readopts = IterOptions::new(None, None, false);
let mut write_iter = fake_engine
.c()
.iterator_cf_opt(CF_WRITE, readopts.clone())
.unwrap();
write_iter.seek(SeekKey::Start).unwrap();
let mut remaining_writes = vec![];
while write_iter.valid().unwrap() {
let write = WriteRef::parse(write_iter.value()).unwrap().to_owned();
let key = write_iter.key().to_vec();
write_iter.next().unwrap();
remaining_writes.push((key, write));
}
let mut default_iter = fake_engine
.c()
.iterator_cf_opt(CF_DEFAULT, readopts)
.unwrap();
default_iter.seek(SeekKey::Start).unwrap();
let mut remaining_defaults = vec![];
while default_iter.valid().unwrap() {
let key = default_iter.key().to_vec();
let value = default_iter.value().to_vec();
default_iter.next().unwrap();
remaining_defaults.push((key, value));
}
assert_eq!(remaining_writes.len(), 1);
let (key, _) = &remaining_writes[0];
assert_eq!(Key::from_encoded(key.clone()).decode_ts().unwrap(), 99.into());
assert_eq!(remaining_defaults.len(), 1);
let (key, _) = &remaining_defaults[0];
assert_eq!(Key::from_encoded(key.clone()).decode_ts().unwrap(), 98.into());
}
}

0 comments on commit 987915d

Please sign in to comment.