From 32694f6df4d6fd97b221fa478ddd1e96332b581f Mon Sep 17 00:00:00 2001 From: longfangsong Date: Fri, 7 Jan 2022 09:45:54 +0800 Subject: [PATCH] Add return to version ref #11555 Signed-off-by: longfangsong --- Cargo.lock | 2 +- Cargo.toml | 4 +- cmd/tikv-ctl/src/cmd.rs | 4 + cmd/tikv-ctl/src/executor.rs | 13 ++ cmd/tikv-ctl/src/main.rs | 3 + src/server/debug.rs | 8 + src/server/mod.rs | 1 + src/server/reset_to_version.rs | 379 +++++++++++++++++++++++++++++++++ src/server/service/debug.rs | 10 + 9 files changed, 421 insertions(+), 3 deletions(-) create mode 100644 src/server/reset_to_version.rs diff --git a/Cargo.lock b/Cargo.lock index 495e2996a3f0..6d8cc6744d52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2363,7 +2363,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#7a8280c36daf05e44f72130f3fd4e585ea2c62d8" +source = "git+https://github.com/longfangsong/kvproto?rev=b7f0ce4e3945d2f5c7081372dd7d23487c1a659b#b7f0ce4e3945d2f5c7081372dd7d23487c1a659b" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/Cargo.toml b/Cargo.toml index 20feb20f2a8f..b00c85532b60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -198,8 +198,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7 # When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to # kvproto at the same time. # After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`. -# [patch.'https://github.com/pingcap/kvproto'] -# kvproto = {git = "https://github.com/your_github_id/kvproto", branch="your_branch"} +[patch.'https://github.com/pingcap/kvproto'] +kvproto = {git = "https://github.com/longfangsong/kvproto", rev = "b7f0ce4e3945d2f5c7081372dd7d23487c1a659b"} [workspace] # See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md diff --git a/cmd/tikv-ctl/src/cmd.rs b/cmd/tikv-ctl/src/cmd.rs index 0375d9658eba..47b295f9c785 100644 --- a/cmd/tikv-ctl/src/cmd.rs +++ b/cmd/tikv-ctl/src/cmd.rs @@ -545,6 +545,10 @@ pub enum Cmd { /// PD endpoints pd: String, }, + ResetToVersion { + #[structopt(short="v")] + version: u64 + }, #[structopt(external_subcommand)] External(Vec), } diff --git a/cmd/tikv-ctl/src/executor.rs b/cmd/tikv-ctl/src/executor.rs index f7418410b12a..4e35450d700b 100644 --- a/cmd/tikv-ctl/src/executor.rs +++ b/cmd/tikv-ctl/src/executor.rs @@ -632,6 +632,8 @@ pub trait DebugExecutor { fn dump_store_info(&self); fn dump_cluster_info(&self); + + fn reset_to_version(&self, version: u64); } impl DebugExecutor for DebugClient { @@ -839,6 +841,13 @@ impl DebugExecutor for DebugClient { .unwrap_or_else(|e| perror_and_exit("DebugClient::get_cluster_info", e)); println!("{}", resp.get_cluster_id()) } + + fn reset_to_version(&self, version: u64) { + let mut req = ResetToVersionRequest::default(); + req.set_ts(version); + self.reset_to_version(&req) + .unwrap_or_else(|e| perror_and_exit("DebugClient::reset_to_version", e)); + } } impl DebugExecutor for Debugger { @@ -1069,6 +1078,10 @@ impl DebugExecutor for Debugger { println!("cluster id: {}", ident.get_cluster_id()); } } + + fn reset_to_version(&self, version: u64) { + Debugger::reset_to_version(self, version); + } } fn handle_engine_error(err: EngineError) -> ! { diff --git a/cmd/tikv-ctl/src/main.rs b/cmd/tikv-ctl/src/main.rs index ec64792aad9b..3db79fe4f72a 100644 --- a/cmd/tikv-ctl/src/main.rs +++ b/cmd/tikv-ctl/src/main.rs @@ -472,6 +472,9 @@ fn main() { Cmd::Cluster {} => { debug_executor.dump_cluster_info(); } + Cmd::ResetToVersion { version } => { + debug_executor.reset_to_version(version); + } _ => { unreachable!() } diff --git a/src/server/debug.rs b/src/server/debug.rs index 84567f1c7862..c3fbf7c5fda4 100644 --- a/src/server/debug.rs +++ b/src/server/debug.rs @@ -34,6 +34,7 @@ use tikv_util::worker::Worker; use txn_types::Key; use crate::config::ConfigController; +use crate::server::reset_to_version::ResetToVersionManager; use crate::storage::mvcc::{Lock, LockType, TimeStamp, Write, WriteRef, WriteType}; pub use crate::storage::mvcc::MvccInfoIterator; @@ -117,6 +118,7 @@ impl From for debugpb::BottommostLevelCompaction { #[derive(Clone)] pub struct Debugger { engines: Engines, + reset_to_version_manager: ResetToVersionManager, cfg_controller: ConfigController, } @@ -125,8 +127,10 @@ impl Debugger { engines: Engines, cfg_controller: ConfigController, ) -> Debugger { + let reset_to_version_manager = ResetToVersionManager::new(engines.kv.clone()); Debugger { engines, + reset_to_version_manager, cfg_controller, } } @@ -882,6 +886,10 @@ impl Debugger { props.append(&mut props1); Ok(props) } + + pub fn reset_to_version(&self, version: u64) { + self.reset_to_version_manager.start(version.into()); + } } fn dump_default_cf_properties( diff --git a/src/server/mod.rs b/src/server/mod.rs index 76d193890880..85d4b6e5350d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -13,6 +13,7 @@ pub mod node; mod proxy; pub mod raftkv; pub mod resolve; +mod reset_to_version; pub mod server; pub mod service; pub mod snap; diff --git a/src/server/reset_to_version.rs b/src/server/reset_to_version.rs new file mode 100644 index 000000000000..f34ad9f66358 --- /dev/null +++ b/src/server/reset_to_version.rs @@ -0,0 +1,379 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use super::Result; +use engine_rocks::RocksEngine; +use engine_rocks::{RocksEngineIterator, RocksWriteBatch}; +use engine_traits::WriteBatch; +use engine_traits::WriteBatchExt; +use engine_traits::{IterOptions, Iterator, CF_DEFAULT, CF_WRITE}; +use engine_traits::{Iterable, CF_LOCK}; +use engine_traits::{Mutable, SeekKey}; +use std::cell::RefCell; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; +use txn_types::{Key, TimeStamp, Write, WriteRef}; + +const BATCH_SIZE: usize = 256; + +#[derive(Debug, Clone)] +pub enum ResetToVersionState { + RemoveWrite { scanned: usize }, + RemoveLock { scanned: usize }, + Done, +} + +impl ResetToVersionState { + pub fn scanned(&mut self) -> &mut usize { + match self { + ResetToVersionState::RemoveWrite { scanned } => scanned, + ResetToVersionState::RemoveLock { scanned } => scanned, + _ => unreachable!(), + } + } +} + +pub struct ResetToVersionWorker { + ts: TimeStamp, + write_iter: RocksEngineIterator, + lock_iter: RocksEngineIterator, + state: Arc>, +} + +struct Batch { + writes: Vec<(Vec, Write)>, + has_more: bool, +} + +#[allow(dead_code)] +impl ResetToVersionWorker { + pub fn new( + mut write_iter: RocksEngineIterator, + mut lock_iter: RocksEngineIterator, + ts: TimeStamp, + state: Arc>, + ) -> Self { + *state + .lock() + .expect("failed to lock `state` in `ResetToVersionWorker::new`") = + ResetToVersionState::RemoveWrite { scanned: 0 }; + write_iter.seek(SeekKey::Start).unwrap(); + lock_iter.seek(SeekKey::Start).unwrap(); + Self { + write_iter, + lock_iter, + ts, + state, + } + } + + fn next_write(&mut self) -> Result, Write)>> { + if self.write_iter.valid().unwrap() { + let mut state = self + .state + .lock() + .expect("failed to lock ResetToVersionWorker::state"); + debug_assert!(matches!( + *state, + ResetToVersionState::RemoveWrite { scanned: _ } + )); + *state.scanned() += 1; + drop(state); + let write = box_try!(WriteRef::parse(self.write_iter.value())).to_owned(); + let key = self.write_iter.key().to_vec(); + self.write_iter.next().unwrap(); + return Ok(Some((key, write))); + } + Ok(None) + } + + fn scan_next_batch(&mut self, batch_size: usize) -> Result { + let mut writes = Vec::with_capacity(batch_size); + let mut has_more = true; + for _ in 0..batch_size { + if let Some((key, write)) = self.next_write()? { + let commit_ts = box_try!(Key::decode_ts_from(keys::origin_key(&key))); + if commit_ts > self.ts { + writes.push((key, write)); + } + } else { + has_more = false; + break; + } + } + Ok(Batch { writes, has_more }) + } + + pub fn process_next_batch( + &mut self, + batch_size: usize, + wb: &mut RocksWriteBatch, + ) -> Result { + let Batch { writes, has_more } = self.scan_next_batch(batch_size)?; + for (key, write) in writes { + let default_key = Key::from_encoded_slice(&key) + .truncate_ts() + .unwrap() + .append_ts(write.start_ts); + box_try!(wb.delete_cf(CF_WRITE, &key)); + box_try!(wb.delete_cf(CF_DEFAULT, default_key.as_encoded())); + } + wb.write().unwrap(); + wb.clear(); + Ok(has_more) + } + + pub fn process_next_batch_lock( + &mut self, + batch_size: usize, + wb: &mut RocksWriteBatch, + ) -> Result { + let mut has_more = true; + for _ in 0..batch_size { + if self.lock_iter.valid().unwrap() { + let mut state = self + .state + .lock() + .expect("failed to lock ResetToVersionWorker::state"); + debug_assert!(matches!( + *state, + ResetToVersionState::RemoveLock { scanned: _ } + )); + *state.scanned() += 1; + drop(state); + + box_try!(wb.delete_cf(CF_LOCK, self.lock_iter.key())); + self.lock_iter.next().unwrap(); + } else { + has_more = false; + break; + } + } + wb.write().unwrap(); + Ok(has_more) + } +} +pub struct ResetToVersionManager { + state: Arc>, + engine: RocksEngine, + worker_handle: RefCell>>, +} + +impl Clone for ResetToVersionManager { + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + engine: self.engine.clone(), + worker_handle: RefCell::new(None), + } + } +} + +#[allow(dead_code)] +impl ResetToVersionManager { + pub fn new(engine: RocksEngine) -> Self { + let state = Arc::new(Mutex::new(ResetToVersionState::RemoveWrite { scanned: 0 })); + ResetToVersionManager { + state, + engine, + worker_handle: RefCell::new(None), + } + } + + pub fn start(&self, ts: TimeStamp) { + let readopts = IterOptions::new(None, None, false); + let write_iter = self + .engine + .iterator_cf_opt(CF_WRITE, readopts.clone()) + .unwrap(); + let lock_iter = self.engine.iterator_cf_opt(CF_LOCK, readopts).unwrap(); + let mut worker = ResetToVersionWorker::new(write_iter, lock_iter, ts, self.state.clone()); + let mut wb = self.engine.write_batch(); + let props = tikv_util::thread_group::current_properties(); + *self.worker_handle.borrow_mut() = Some(std::thread::Builder::new() + .name("reset_to_version".to_string()) + .spawn(move || { + tikv_util::thread_group::set_properties(props); + tikv_alloc::add_thread_memory_accessor(); + + while worker.process_next_batch(BATCH_SIZE, &mut wb).expect("reset_to_version failed when removing invalid writes") { + } + *worker.state.lock() + .expect("failed to lock `ResetToVersionWorker::state` in `ResetToVersionWorker::process_next_batch`") + = ResetToVersionState::RemoveLock { scanned: 0 }; + while worker.process_next_batch_lock(BATCH_SIZE, &mut wb).expect("reset_to_version failed when removing invalid locks") { + } + *worker.state.lock() + .expect("failed to lock `ResetToVersionWorker::state` in `ResetToVersionWorker::process_next_batch_lock`") + = ResetToVersionState::Done; + + tikv_alloc::remove_thread_memory_accessor(); + }) + .expect("failed to spawn reset_to_version thread")); + } + + pub fn state(&self) -> ResetToVersionState { + self.state + .lock() + .expect("failed to lock `state` in `ResetToVersionManager::state`") + .clone() + } + + #[cfg(test)] + pub fn wait(&mut self) { + self.worker_handle.take().unwrap().join().unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use engine_rocks::raw::{ColumnFamilyOptions, DBOptions}; + use engine_rocks::raw_util::CFOptions; + use engine_rocks::Compat; + use engine_traits::{WriteBatch, WriteBatchExt}; + use engine_traits::{CF_LOCK, CF_RAFT}; + use tempfile::Builder; + use txn_types::{Lock, LockType, WriteType}; + + #[test] + fn test_basic() { + enum Expect { + Keep, + Remove, + } + + let tmp = Builder::new().prefix("test_basic").tempdir().unwrap(); + let path = tmp.path().to_str().unwrap(); + let fake_engine = Arc::new( + engine_rocks::raw_util::new_engine_opt( + path, + DBOptions::new(), + vec![ + CFOptions::new(CF_DEFAULT, ColumnFamilyOptions::new()), + CFOptions::new(CF_WRITE, ColumnFamilyOptions::new()), + CFOptions::new(CF_LOCK, ColumnFamilyOptions::new()), + CFOptions::new(CF_RAFT, ColumnFamilyOptions::new()), + ], + ) + .unwrap(), + ); + + let write = vec![ + // key, start_ts, commit_ts + (b"k", 104, 105, Expect::Remove), + (b"k", 102, 103, Expect::Remove), + (b"k", 100, 101, Expect::Keep), + (b"k", 98, 99, Expect::Keep), + ]; + let default = vec![ + // key, start_ts + (b"k", 104, Expect::Remove), + (b"k", 102, Expect::Remove), + (b"k", 100, Expect::Keep), + (b"k", 98, Expect::Keep), + ]; + let lock = vec![ + // key, start_ts, for_update_ts, lock_type, short_value, check + (b"k", 100, 0, LockType::Put, false, Expect::Remove), + (b"k", 100, 0, LockType::Delete, false, Expect::Keep), + (b"k", 100, 0, LockType::Put, true, Expect::Keep), + (b"k", 100, 0, LockType::Put, false, Expect::Keep), + (b"k", 100, 0, LockType::Put, false, Expect::Remove), + ]; + let mut kv = vec![]; + for (key, start_ts, commit_ts, expect) in write { + let write = Write::new(WriteType::Put, start_ts.into(), None); + kv.push(( + CF_WRITE, + Key::from_raw(key).append_ts(commit_ts.into()), + write.as_ref().to_bytes(), + expect, + )); + } + for (key, ts, expect) in default { + kv.push(( + CF_DEFAULT, + Key::from_raw(key).append_ts(ts.into()), + b"v".to_vec(), + expect, + )); + } + for (key, ts, for_update_ts, tp, short_value, expect) in lock { + let v = if short_value { + Some(b"v".to_vec()) + } else { + None + }; + let lock = Lock::new( + tp, + vec![], + ts.into(), + 0, + v, + for_update_ts.into(), + 0, + TimeStamp::zero(), + ); + kv.push((CF_LOCK, Key::from_raw(key), lock.to_bytes(), expect)); + } + let mut wb = fake_engine.c().write_batch(); + for &(cf, ref k, ref v, _) in &kv { + wb.put_cf(cf, &keys::data_key(k.as_encoded()), v).unwrap(); + } + wb.write().unwrap(); + + let mut manager = ResetToVersionManager::new(fake_engine.c().clone()); + manager.start(100.into()); + manager.wait(); + + let readopts = IterOptions::new(None, None, false); + let mut write_iter = fake_engine + .c() + .iterator_cf_opt(CF_WRITE, readopts.clone()) + .unwrap(); + write_iter.seek(SeekKey::Start).unwrap(); + let mut remaining_writes = vec![]; + while write_iter.valid().unwrap() { + let write = WriteRef::parse(write_iter.value()).unwrap().to_owned(); + let key = write_iter.key().to_vec(); + write_iter.next().unwrap(); + remaining_writes.push((key, write)); + } + let mut default_iter = fake_engine + .c() + .iterator_cf_opt(CF_DEFAULT, readopts.clone()) + .unwrap(); + default_iter.seek(SeekKey::Start).unwrap(); + let mut remaining_defaults = vec![]; + while default_iter.valid().unwrap() { + let key = default_iter.key().to_vec(); + let value = default_iter.value().to_vec(); + default_iter.next().unwrap(); + remaining_defaults.push((key, value)); + } + + let mut lock_iter = fake_engine.c().iterator_cf_opt(CF_LOCK, readopts).unwrap(); + lock_iter.seek(SeekKey::Start).unwrap(); + let mut remaining_locks = vec![]; + while lock_iter.valid().unwrap() { + let lock = Lock::parse(lock_iter.value()).unwrap().to_owned(); + let key = lock_iter.key().to_vec(); + lock_iter.next().unwrap(); + remaining_locks.push((key, lock)); + } + + assert_eq!(remaining_writes.len(), 1); + let (key, _) = &remaining_writes[0]; + assert_eq!( + Key::from_encoded(key.clone()).decode_ts().unwrap(), + 99.into() + ); + assert_eq!(remaining_defaults.len(), 1); + let (key, _) = &remaining_defaults[0]; + assert_eq!( + Key::from_encoded(key.clone()).decode_ts().unwrap(), + 98.into() + ); + assert!(remaining_locks.is_empty()); + } +} diff --git a/src/server/service/debug.rs b/src/server/service/debug.rs index 8165d2bdc443..0dde6e5e69f3 100644 --- a/src/server/service/debug.rs +++ b/src/server/service/debug.rs @@ -511,6 +511,16 @@ impl + 'static> debugpb::Debug f self.handle_response(ctx, sink, f, TAG); } + + fn reset_to_version( + &mut self, + _ctx: RpcContext<'_>, + req: ResetToVersionRequest, + sink: UnarySink, + ) { + self.debugger.reset_to_version(req.get_ts()); + sink.success(ResetToVersionResponse::default()); + } } fn region_detail>(