Skip to content

Commit

Permalink
Introduce two phase FAP (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo authored Nov 29, 2023
1 parent 692929e commit 24c3802
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,14 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
let current = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
info!("fast path: applied first snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id;
info!("fast path: start applied first snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);

self.engine_store_server_helper.apply_fap_snapshot(region_id, peer_id);

info!("fast path: finished applied first snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"cost_snapshot" => current.as_millis() - last,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<T: Simulator<TiFlashEngine>> MixedCluster for Cluster<T> {
panic!(
"can't get value {:?} for key {}",
value.map(tikv_util::escape),
log_wrappers::hex_encode_upper(key)
log_wrappers::hex_encode_upper(key),
)
}
fn run_node(&mut self, node_id: u64) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub fn copy_meta_from<EK: engine_traits::KvEngine, ER: RaftEngine + RaftEngineDe
copy_raft_state: bool,
) -> raftstore::Result<()> {
let region_id = source.region.get_id();

let mut wb = target_engines.kv.write_batch();

// Can't copy this key, otherwise will cause a bootstrap.
Expand Down Expand Up @@ -146,7 +145,9 @@ pub fn copy_data_from(
|| source.region.get_end_key().is_empty())
{
debug!(
"copy_data_from write to region {} {:?} {:?} S {:?} E {:?}",
"copy_data_from {} to {} write to region {} {:?} {:?} S {:?} E {:?}",
source.peer.get_store_id(),
target.peer.get_store_id(),
region_id,
k,
v,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct EngineStoreServer {
pub mock_cfg: MockConfig,
pub region_states: RefCell<HashMap<RegionId, RegionStats>>,
pub page_storage: MockPageStorage,
// (region_id, peer_id) -> MockRegion
pub tmp_fap_regions: HashMap<(RegionId, u64), Box<MockRegion>>,
}

impl EngineStoreServer {
Expand All @@ -37,6 +39,7 @@ impl EngineStoreServer {
mock_cfg: MockConfig::default(),
region_states: RefCell::new(Default::default()),
page_storage: Default::default(),
tmp_fap_regions: Default::default(),
}
}

Expand Down Expand Up @@ -352,6 +355,7 @@ pub fn gen_engine_store_server_helper(
fn_set_pb_msg_by_bytes: Some(ffi_set_pb_msg_by_bytes),
fn_handle_safe_ts_update: Some(ffi_handle_safe_ts_update),
fn_fast_add_peer: Some(ffi_fast_add_peer),
fn_apply_fap_snapshot: Some(ffi_apply_fap_snapshot),
ps: PageStorageInterfaces {
fn_create_write_batch: Some(ffi_mockps_create_write_batch),
fn_wb_put_page: Some(ffi_mockps_wb_put_page),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@ use super::{
};
use crate::mock_cluster;

pub(crate) unsafe extern "C" fn ffi_apply_fap_snapshot(
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
region_id: u64,
peer_id: u64,
) {
let store = into_engine_store_server_wrap(arg1);
let new_region = (*store.engine_store_server)
.tmp_fap_regions
.remove(&(region_id, peer_id))
.unwrap();
(*store.engine_store_server)
.kvstore
.insert(region_id, new_region);
let target_region = (*store.engine_store_server)
.kvstore
.get_mut(&region_id)
.unwrap();
crate::write_snapshot_to_db_data(
&mut (*store.engine_store_server),
target_region,
String::from("fast-add-peer"),
);
}

#[allow(clippy::redundant_closure_call)]
pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
Expand Down Expand Up @@ -117,29 +141,20 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData));
return;
}

// TODO check commit_index and applied_index here
debug!("recover from remote peer: preparing from {} to {}, check target", from_store, store_id; "region_id" => region_id);
let new_region = make_new_region(
let mut new_region = make_new_region(
Some(new_region_meta.clone()),
Some((*store.engine_store_server).id),
);
(*store.engine_store_server)
.kvstore
.insert(region_id, Box::new(new_region));
let target_engines = match (*store.engine_store_server).engines.clone() {
Some(s) => s,
None => {
ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::OtherError));
return;
}
};
let target_region = match (*store.engine_store_server).kvstore.get_mut(&region_id) {
Some(s) => s,
None => {
ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData));
return;
}
};
debug!("recover from remote peer: meta from {} to {}", from_store, store_id; "region_id" => region_id);
// Must first dump meta then data, otherwise data may lag behind.
// We can see a raft log hole at applied_index otherwise.
Expand All @@ -163,17 +178,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
&source_engines,
&target_engines,
&source_region,
target_region,
&mut new_region,
) {
error!("recover from remote peer: inject error {:?}", e; "region_id" => region_id);
ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::FailedInject));
return;
}
crate::write_snapshot_to_db_data(
&mut (*store.engine_store_server),
target_region,
String::from("fast-add-peer"),
);

if fail_after_write {
let mut raft_wb = target_engines.raft.log_batch(1024);
let mut entries: Vec<raft::eraftpb::Entry> = Default::default();
Expand All @@ -199,8 +210,11 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
let region_bytes = region_local_state.get_region().write_to_bytes().unwrap();
let apply_state_ptr = create_cpp_str(Some(apply_state_bytes));
let region_ptr = create_cpp_str(Some(region_bytes));

(*store.engine_store_server).tmp_fap_regions.insert((new_region_meta.get_id(), new_peer_id), Box::new(new_region));
// Check if we have commit_index.
debug!("recover from remote peer: ok from {} to {}", from_store, store_id; "region_id" => region_id);

ret = Some(interfaces_ffi::FastAddPeerRes {
status: interfaces_ffi::FastAddPeerStatus::Ok,
apply_state: apply_state_ptr,
Expand Down
5 changes: 5 additions & 0 deletions proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ impl EngineStoreServerHelper {
}
}

pub fn apply_fap_snapshot(&self, region_id: u64, peer_id: u64) {
debug_assert!(self.fn_apply_fap_snapshot.is_some());
unsafe { (self.fn_apply_fap_snapshot.into_inner())(self.inner, region_id, peer_id) }
}

pub fn handle_ingest_sst(
&self,
snaps: Vec<(&[u8], ColumnFamilyType)>,
Expand Down
16 changes: 15 additions & 1 deletion proxy_components/proxy_ffi/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ pub mod root {
BadData = 4,
FailedInject = 5,
}
#[repr(u8)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum PrehandledSnapshotType {
None = 0,
Legacy = 1,
FAPCheckpoint = 2,
}
#[repr(C)]
#[derive(Debug)]
pub struct FastAddPeerRes {
Expand Down Expand Up @@ -626,6 +633,13 @@ pub mod root {
arg3: root::DB::RawCppPtrType,
),
>,
pub fn_apply_fap_snapshot: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
arg2: u64,
arg3: u64,
),
>,
pub fn_handle_http_request: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
Expand Down Expand Up @@ -695,7 +709,7 @@ pub mod root {
arg3: root::DB::RawVoidPtr,
) -> u32;
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 3413295096116791749;
pub const RAFT_STORE_PROXY_VERSION: u64 = 8024556142803901851;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
30 changes: 17 additions & 13 deletions proxy_tests/proxy/shared/fast_add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ fn test_overlap_last_apply_old() {
// If a legacy snapshot is applied between fn_fast_add_peer and
// build_and_send_snapshot, it will override the previous snapshot's data, which
// is actually newer.

// It if origianlly https://github.com/pingcap/tidb-engine-ext/pull/359 before two-stage fap.
#[test]
fn test_overlap_apply_legacy_in_the_middle() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3);
Expand Down Expand Up @@ -275,19 +275,25 @@ fn test_overlap_apply_legacy_in_the_middle() {
states.in_disk_region_state.get_region().get_peers().len() == 3
},
);
std::thread::sleep(std::time::Duration::from_millis(1000));
std::thread::sleep(std::time::Duration::from_millis(500));
fail::cfg("fap_ffi_pause_after_fap_call", "pause").unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
fail::remove("fap_ffi_pause");

// std::thread::sleep(std::time::Duration::from_millis(5000));
iter_ffi_helpers(&cluster, Some(vec![3]), &mut |_, ffi: &mut FFIHelperSet| {
assert!(!ffi.engine_store_server.kvstore.contains_key(&1000));
});

// The snapshot for new_one_1000_k1 is in `tmp_fap_regions`.
// Raftstore v1 is mono store, so there could be v1 written by old_one_1_k3.
check_key_ex(
&cluster,
b"k1",
b"v13",
b"v1",
Some(false),
None,
Some(true),
Some(vec![3]),
None,
Some(new_one_1000_k1.get_id()),
true,
);

Expand All @@ -296,29 +302,27 @@ fn test_overlap_apply_legacy_in_the_middle() {
fail::remove("fap_mock_add_peer_from_id");
fail::remove("fap_on_msg_snapshot_1_3003");

// std::thread::sleep(std::time::Duration::from_millis(5000));
check_key_ex(
&cluster,
b"k1",
b"v1",
None,
Some(true),
Some(vec![3]),
None,
Some(new_one_1000_k1.get_id()),
true,
);
// Make FAP continue after the legacy snapshot is applied.
fail::remove("fap_ffi_pause_after_fap_call");
// TODO wait until fap finishes.
// std::thread::sleep(std::time::Duration::from_millis(5000));
std::thread::sleep(std::time::Duration::from_millis(2000));
check_key_ex(
&cluster,
b"k1",
b"v1",
b"v13",
None,
Some(true),
Some(vec![3]),
None,
Some(new_one_1000_k1.get_id()),
true,
);

Expand Down Expand Up @@ -575,7 +579,7 @@ fn simple_fast_add_peer(
// Re-add peer in store.
pd_client.must_add_peer(1, new_learner_peer(3, 4));
// Wait until Learner has applied ConfChange
std::thread::sleep(std::time::Duration::from_millis(1000));
std::thread::sleep(std::time::Duration::from_millis(2000));
must_wait_until_cond_node(
&cluster.cluster_ext,
1,
Expand Down
2 changes: 1 addition & 1 deletion raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once
#include <cstdint>
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 3413295096116791749ull; }
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 8024556142803901851ull; }
7 changes: 7 additions & 0 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ enum class FastAddPeerStatus : uint32_t {
FailedInject,
};

enum class PrehandledSnapshotType : uint8_t {
None = 0,
Legacy,
FAPCheckpoint,
};

struct FastAddPeerRes {
FastAddPeerStatus status;
CppStrWithView apply_state;
Expand Down Expand Up @@ -327,6 +333,7 @@ struct EngineStoreServerHelper {
uint64_t);
void (*fn_release_pre_handled_snapshot)(EngineStoreServerWrap *, RawVoidPtr,
RawCppPtrType);
void (*fn_apply_fap_snapshot)(EngineStoreServerWrap *, uint64_t, uint64_t);
HttpRequestRes (*fn_handle_http_request)(EngineStoreServerWrap *,
BaseBuffView path,
BaseBuffView query,
Expand Down

0 comments on commit 24c3802

Please sign in to comment.