From 24c3802ffee888a64f82d8b477da53c367eee88b Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 29 Nov 2023 14:27:22 +0800 Subject: [PATCH] Introduce two phase FAP (#360) --- .../src/core/forward_raft/snapshot.rs | 9 +++- .../src/mock_cluster/v1/mod.rs | 2 +- .../src/mock_store/mock_core.rs | 5 +- .../mock_store/mock_engine_store_server.rs | 4 ++ .../mock_store/mock_fast_add_peer_impls.rs | 48 ++++++++++++------- .../src/engine_store_helper_impls.rs | 5 ++ proxy_components/proxy_ffi/src/interfaces.rs | 16 ++++++- proxy_tests/proxy/shared/fast_add_peer.rs | 30 +++++++----- .../ffi/src/RaftStoreProxyFFI/@version | 2 +- .../ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 7 +++ 10 files changed, 92 insertions(+), 36 deletions(-) diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs index a83f9ed1bed..d35b52190b5 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs @@ -277,7 +277,14 @@ impl ProxyForwarder { let current = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); - info!("fast path: applied first snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id; + info!("fast path: start applied first snapshot {}:{} {}", self.store_id, region_id, peer_id; + "snap_key" => ?snap_key, + "region_id" => region_id, + ); + + self.engine_store_server_helper.apply_fap_snapshot(region_id, peer_id); + + info!("fast path: finished applied first snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id; "snap_key" => ?snap_key, "region_id" => region_id, "cost_snapshot" => current.as_millis() - last, diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/mod.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/mod.rs index 09909301d37..89073a80970 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/mod.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/mod.rs @@ -73,7 +73,7 @@ impl> MixedCluster for Cluster { panic!( "can't get value {:?} for key {}", value.map(tikv_util::escape), - log_wrappers::hex_encode_upper(key) + log_wrappers::hex_encode_upper(key), ) } fn run_node(&mut self, node_id: u64) { diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_core.rs b/proxy_components/mock-engine-store/src/mock_store/mock_core.rs index 90b3af719ee..fcbe37f43bb 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_core.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_core.rs @@ -86,7 +86,6 @@ pub fn copy_meta_from raftstore::Result<()> { let region_id = source.region.get_id(); - let mut wb = target_engines.kv.write_batch(); // Can't copy this key, otherwise will cause a bootstrap. @@ -146,7 +145,9 @@ pub fn copy_data_from( || source.region.get_end_key().is_empty()) { debug!( - "copy_data_from write to region {} {:?} {:?} S {:?} E {:?}", + "copy_data_from {} to {} write to region {} {:?} {:?} S {:?} E {:?}", + source.peer.get_store_id(), + target.peer.get_store_id(), region_id, k, v, diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index 48add84713f..01ef8895868 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -26,6 +26,8 @@ pub struct EngineStoreServer { pub mock_cfg: MockConfig, pub region_states: RefCell>, pub page_storage: MockPageStorage, + // (region_id, peer_id) -> MockRegion + pub tmp_fap_regions: HashMap<(RegionId, u64), Box>, } impl EngineStoreServer { @@ -37,6 +39,7 @@ impl EngineStoreServer { mock_cfg: MockConfig::default(), region_states: RefCell::new(Default::default()), page_storage: Default::default(), + tmp_fap_regions: Default::default(), } } @@ -352,6 +355,7 @@ pub fn gen_engine_store_server_helper( fn_set_pb_msg_by_bytes: Some(ffi_set_pb_msg_by_bytes), fn_handle_safe_ts_update: Some(ffi_handle_safe_ts_update), fn_fast_add_peer: Some(ffi_fast_add_peer), + fn_apply_fap_snapshot: Some(ffi_apply_fap_snapshot), ps: PageStorageInterfaces { fn_create_write_batch: Some(ffi_mockps_create_write_batch), fn_wb_put_page: Some(ffi_mockps_wb_put_page), diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs b/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs index ae28b0f06f2..6ef502ad311 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs @@ -9,6 +9,30 @@ use super::{ }; use crate::mock_cluster; +pub(crate) unsafe extern "C" fn ffi_apply_fap_snapshot( + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + region_id: u64, + peer_id: u64, +) { + let store = into_engine_store_server_wrap(arg1); + let new_region = (*store.engine_store_server) + .tmp_fap_regions + .remove(&(region_id, peer_id)) + .unwrap(); + (*store.engine_store_server) + .kvstore + .insert(region_id, new_region); + let target_region = (*store.engine_store_server) + .kvstore + .get_mut(®ion_id) + .unwrap(); + crate::write_snapshot_to_db_data( + &mut (*store.engine_store_server), + target_region, + String::from("fast-add-peer"), + ); +} + #[allow(clippy::redundant_closure_call)] pub(crate) unsafe extern "C" fn ffi_fast_add_peer( arg1: *mut interfaces_ffi::EngineStoreServerWrap, @@ -117,15 +141,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData)); return; } + // TODO check commit_index and applied_index here debug!("recover from remote peer: preparing from {} to {}, check target", from_store, store_id; "region_id" => region_id); - let new_region = make_new_region( + let mut new_region = make_new_region( Some(new_region_meta.clone()), Some((*store.engine_store_server).id), ); - (*store.engine_store_server) - .kvstore - .insert(region_id, Box::new(new_region)); let target_engines = match (*store.engine_store_server).engines.clone() { Some(s) => s, None => { @@ -133,13 +155,6 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( return; } }; - let target_region = match (*store.engine_store_server).kvstore.get_mut(®ion_id) { - Some(s) => s, - None => { - ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)); - return; - } - }; debug!("recover from remote peer: meta from {} to {}", from_store, store_id; "region_id" => region_id); // Must first dump meta then data, otherwise data may lag behind. // We can see a raft log hole at applied_index otherwise. @@ -163,17 +178,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( &source_engines, &target_engines, &source_region, - target_region, + &mut new_region, ) { error!("recover from remote peer: inject error {:?}", e; "region_id" => region_id); ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::FailedInject)); return; } - crate::write_snapshot_to_db_data( - &mut (*store.engine_store_server), - target_region, - String::from("fast-add-peer"), - ); + if fail_after_write { let mut raft_wb = target_engines.raft.log_batch(1024); let mut entries: Vec = Default::default(); @@ -199,8 +210,11 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( let region_bytes = region_local_state.get_region().write_to_bytes().unwrap(); let apply_state_ptr = create_cpp_str(Some(apply_state_bytes)); let region_ptr = create_cpp_str(Some(region_bytes)); + + (*store.engine_store_server).tmp_fap_regions.insert((new_region_meta.get_id(), new_peer_id), Box::new(new_region)); // Check if we have commit_index. debug!("recover from remote peer: ok from {} to {}", from_store, store_id; "region_id" => region_id); + ret = Some(interfaces_ffi::FastAddPeerRes { status: interfaces_ffi::FastAddPeerStatus::Ok, apply_state: apply_state_ptr, diff --git a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs index 08fc670e7fd..f5633983896 100644 --- a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs @@ -220,6 +220,11 @@ impl EngineStoreServerHelper { } } + pub fn apply_fap_snapshot(&self, region_id: u64, peer_id: u64) { + debug_assert!(self.fn_apply_fap_snapshot.is_some()); + unsafe { (self.fn_apply_fap_snapshot.into_inner())(self.inner, region_id, peer_id) } + } + pub fn handle_ingest_sst( &self, snaps: Vec<(&[u8], ColumnFamilyType)>, diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index fbdd2fcf173..cb100adcfe9 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -335,6 +335,13 @@ pub mod root { BadData = 4, FailedInject = 5, } + #[repr(u8)] + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] + pub enum PrehandledSnapshotType { + None = 0, + Legacy = 1, + FAPCheckpoint = 2, + } #[repr(C)] #[derive(Debug)] pub struct FastAddPeerRes { @@ -626,6 +633,13 @@ pub mod root { arg3: root::DB::RawCppPtrType, ), >, + pub fn_apply_fap_snapshot: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut root::DB::EngineStoreServerWrap, + arg2: u64, + arg3: u64, + ), + >, pub fn_handle_http_request: ::std::option::Option< unsafe extern "C" fn( arg1: *mut root::DB::EngineStoreServerWrap, @@ -695,7 +709,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 3413295096116791749; + pub const RAFT_STORE_PROXY_VERSION: u64 = 8024556142803901851; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_tests/proxy/shared/fast_add_peer.rs b/proxy_tests/proxy/shared/fast_add_peer.rs index 7c4ba2afa05..8e9220a3dfd 100644 --- a/proxy_tests/proxy/shared/fast_add_peer.rs +++ b/proxy_tests/proxy/shared/fast_add_peer.rs @@ -188,7 +188,7 @@ fn test_overlap_last_apply_old() { // If a legacy snapshot is applied between fn_fast_add_peer and // build_and_send_snapshot, it will override the previous snapshot's data, which // is actually newer. - +// It if origianlly https://github.com/pingcap/tidb-engine-ext/pull/359 before two-stage fap. #[test] fn test_overlap_apply_legacy_in_the_middle() { let (mut cluster, pd_client) = new_mock_cluster_snap(0, 3); @@ -275,19 +275,25 @@ fn test_overlap_apply_legacy_in_the_middle() { states.in_disk_region_state.get_region().get_peers().len() == 3 }, ); - std::thread::sleep(std::time::Duration::from_millis(1000)); + std::thread::sleep(std::time::Duration::from_millis(500)); fail::cfg("fap_ffi_pause_after_fap_call", "pause").unwrap(); + std::thread::sleep(std::time::Duration::from_millis(500)); fail::remove("fap_ffi_pause"); - // std::thread::sleep(std::time::Duration::from_millis(5000)); + iter_ffi_helpers(&cluster, Some(vec![3]), &mut |_, ffi: &mut FFIHelperSet| { + assert!(!ffi.engine_store_server.kvstore.contains_key(&1000)); + }); + + // The snapshot for new_one_1000_k1 is in `tmp_fap_regions`. + // Raftstore v1 is mono store, so there could be v1 written by old_one_1_k3. check_key_ex( &cluster, b"k1", - b"v13", + b"v1", + Some(false), None, - Some(true), Some(vec![3]), - None, + Some(new_one_1000_k1.get_id()), true, ); @@ -296,7 +302,6 @@ fn test_overlap_apply_legacy_in_the_middle() { fail::remove("fap_mock_add_peer_from_id"); fail::remove("fap_on_msg_snapshot_1_3003"); - // std::thread::sleep(std::time::Duration::from_millis(5000)); check_key_ex( &cluster, b"k1", @@ -304,21 +309,20 @@ fn test_overlap_apply_legacy_in_the_middle() { None, Some(true), Some(vec![3]), - None, + Some(new_one_1000_k1.get_id()), true, ); // Make FAP continue after the legacy snapshot is applied. fail::remove("fap_ffi_pause_after_fap_call"); - // TODO wait until fap finishes. - // std::thread::sleep(std::time::Duration::from_millis(5000)); + std::thread::sleep(std::time::Duration::from_millis(2000)); check_key_ex( &cluster, b"k1", - b"v1", + b"v13", None, Some(true), Some(vec![3]), - None, + Some(new_one_1000_k1.get_id()), true, ); @@ -575,7 +579,7 @@ fn simple_fast_add_peer( // Re-add peer in store. pd_client.must_add_peer(1, new_learner_peer(3, 4)); // Wait until Learner has applied ConfChange - std::thread::sleep(std::time::Duration::from_millis(1000)); + std::thread::sleep(std::time::Duration::from_millis(2000)); must_wait_until_cond_node( &cluster.cluster_ext, 1, diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 21eddea6cfc..3ef2e24f395 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 3413295096116791749ull; } +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 8024556142803901851ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index da9236abc79..0ecdce222c9 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -227,6 +227,12 @@ enum class FastAddPeerStatus : uint32_t { FailedInject, }; +enum class PrehandledSnapshotType : uint8_t { + None = 0, + Legacy, + FAPCheckpoint, +}; + struct FastAddPeerRes { FastAddPeerStatus status; CppStrWithView apply_state; @@ -327,6 +333,7 @@ struct EngineStoreServerHelper { uint64_t); void (*fn_release_pre_handled_snapshot)(EngineStoreServerWrap *, RawVoidPtr, RawCppPtrType); + void (*fn_apply_fap_snapshot)(EngineStoreServerWrap *, uint64_t, uint64_t); HttpRequestRes (*fn_handle_http_request)(EngineStoreServerWrap *, BaseBuffView path, BaseBuffView query,