Skip to content

Commit

Permalink
Add return to version
Browse files Browse the repository at this point in the history
ref tikv#11555
Signed-off-by: longfangsong <[email protected]>
  • Loading branch information
longfangsong committed Dec 17, 2021
1 parent 3b15b89 commit db86afd
Show file tree
Hide file tree
Showing 16 changed files with 515 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7
# When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to
# kvproto at the same time.
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
# [patch.'https://github.com/pingcap/kvproto']
# kvproto = {git = "https://github.com/your_github_id/kvproto", branch="your_branch"}
[patch.'https://github.com/pingcap/kvproto']
kvproto = {git = "https://github.com/longfangsong/kvproto", rev = "3a471681b15b41208000684f56d4deb5c311487d"}

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
13 changes: 13 additions & 0 deletions cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ trait DebugExecutor {
fn dump_store_info(&self);

fn dump_cluster_info(&self);

fn reset_to_version(&self, version: u64);
}

impl DebugExecutor for DebugClient {
Expand Down Expand Up @@ -880,6 +882,13 @@ impl DebugExecutor for DebugClient {
.unwrap_or_else(|e| perror_and_exit("DebugClient::get_cluster_info", e));
println!("{}", resp.get_cluster_id())
}

fn reset_to_version(&self, version: u64) {
let mut req = ResetToVersionRequest::default();
req.set_ts(version);
self.reset_to_version(&ResetToVersionRequest::default())
.unwrap_or_else(|e| perror_and_exit("DebugClient::reset_to_version", e));
}
}

impl<ER: RaftEngine> DebugExecutor for Debugger<ER> {
Expand Down Expand Up @@ -1110,6 +1119,10 @@ impl<ER: RaftEngine> DebugExecutor for Debugger<ER> {
println!("cluster id: {}", ident.get_cluster_id());
}
}

fn reset_to_version(&self, version: u64) {
Debugger::reset_to_version(self, version);
}
}

fn warning_prompt(message: &str) -> bool {
Expand Down
1 change: 1 addition & 0 deletions components/external_storage/export/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ fn create_backend_inner(
return Err(bad_backend(Backend::CloudDynamic(dyn_backend.clone())));
}
},
Backend::AzureBlobStorage(_) => unimplemented!(),
#[cfg(not(any(feature = "cloud-gcp", feature = "cloud-aws")))]
_ => return Err(bad_backend(backend.clone())),
};
Expand Down
8 changes: 8 additions & 0 deletions src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tikv_util::worker::Worker;
use txn_types::Key;

use crate::config::ConfigController;
use crate::server::reset_to_version::ResetToVersionManager;
use crate::storage::mvcc::{Lock, LockType, TimeStamp, Write, WriteRef, WriteType};

pub use crate::storage::mvcc::MvccInfoIterator;
Expand Down Expand Up @@ -117,6 +118,7 @@ impl From<BottommostLevelCompaction> for debugpb::BottommostLevelCompaction {
#[derive(Clone)]
pub struct Debugger<ER: RaftEngine> {
engines: Engines<RocksEngine, ER>,
reset_to_version_manager: ResetToVersionManager,
cfg_controller: ConfigController,
}

Expand All @@ -125,8 +127,10 @@ impl<ER: RaftEngine> Debugger<ER> {
engines: Engines<RocksEngine, ER>,
cfg_controller: ConfigController,
) -> Debugger<ER> {
let reset_to_version_manager = ResetToVersionManager::new(engines.kv.clone());
Debugger {
engines,
reset_to_version_manager,
cfg_controller,
}
}
Expand Down Expand Up @@ -868,6 +872,10 @@ impl<ER: RaftEngine> Debugger<ER> {
&keys::data_end_key(end),
)
}

pub fn reset_to_version(&self, version: u64) {
self.reset_to_version_manager.start(version.into());
}
}

fn dump_mvcc_properties(db: &Arc<DB>, start: &[u8], end: &[u8]) -> Result<Vec<(String, String)>> {
Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod node;
mod proxy;
pub mod raftkv;
pub mod resolve;
mod reset_to_version;
pub mod server;
pub mod service;
pub mod snap;
Expand Down
5 changes: 3 additions & 2 deletions src/server/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ impl Buffer for BatchMessageBuffer {
let mut msg_size = msg.start_key.len()
+ msg.end_key.len()
+ msg.get_message().context.len()
+ msg.extra_ctx.len();
+ msg.extra_ctx.len()
// index: 3, term: 2, data tag and size: 3, entry tag and size: 3
+ 11 * msg.get_message().get_entries().len();
for entry in msg.get_message().get_entries() {
msg_size += entry.data.len();
}
Expand Down Expand Up @@ -568,7 +570,6 @@ where

let cb = ChannelBuilder::new(self.builder.env.clone())
.stream_initial_window_size(self.builder.cfg.grpc_stream_initial_window_size.0 as i32)
.max_send_message_len(self.builder.cfg.max_grpc_send_msg_len)
.keepalive_time(self.builder.cfg.grpc_keepalive_time.0)
.keepalive_timeout(self.builder.cfg.grpc_keepalive_timeout.0)
.default_compression_algorithm(self.builder.cfg.grpc_compression_algorithm())
Expand Down
Loading

0 comments on commit db86afd

Please sign in to comment.