Skip to content

Commit

Permalink
test added
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo committed Aug 12, 2024
1 parent 51d55c1 commit 48da47f
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ impl MockRegion {
#[derive(Default)]
pub struct RegionStats {
pub pre_handle_count: AtomicU64,
// Count of call to `ffi_fast_add_peer`.
pub fast_add_peer_count: AtomicU64,
pub apply_snap_count: AtomicU64,
// FAP is finished building. Whether succeed or not.
pub finished_fast_add_peer_count: AtomicU64,
pub started_fast_add_peers: std::sync::Mutex<HashSet<u64>>,
}

// In case of newly added cfs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ impl EngineStoreServer {
f(self.region_states.borrow_mut().get_mut(&region_id).unwrap())
}

pub fn mutate_region_states_mut<F: FnMut(&mut RegionStats)>(
&self,
region_id: RegionId,
mut f: F,
) {
let has = self.region_states.borrow().contains_key(&region_id);
if !has {
self.region_states
.borrow_mut()
.insert(region_id, Default::default());
}
f(self.region_states.borrow_mut().get_mut(&region_id).unwrap())
}

pub fn get_mem(
&self,
region_id: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
let store_id = (*store.engine_store_server).id;
(*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| {
e.fast_add_peer_count.fetch_add(1, Ordering::SeqCst);
e.started_fast_add_peers.lock().unwrap().insert(region_id);
});

let failed_add_peer_res =
Expand All @@ -147,6 +148,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
});
0
})() != 0;
let force_wait_for_data: bool = (|| {
fail::fail_point!("fap_mock_force_wait_for_data", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
t
});
0
})() != 0;
let fail_after_write: bool = (|| {
fail::fail_point!("fap_mock_fail_after_write", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
Expand All @@ -156,6 +164,10 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
})() != 0;
debug!("recover from remote peer: enter from {} to {}", from_store, store_id; "region_id" => region_id);

if force_wait_for_data {
debug!("recover from remote peer: force_wait_for_data from {} to {}", from_store, store_id; "region_id" => region_id);
return failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData);
}
for retry in 0..300 {
let mut ret: Option<interfaces_ffi::FastAddPeerRes> = None;
if retry > 0 {
Expand Down Expand Up @@ -309,13 +321,33 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
if block_wait {
continue;
} else {
(*store.engine_store_server).mutate_region_states(
region_id,
|e: &mut RegionStats| {
e.finished_fast_add_peer_count
.fetch_add(1, Ordering::SeqCst);
},
);
return r;
}
}
_ => return r,
_ => {
(*store.engine_store_server).mutate_region_states(
region_id,
|e: &mut RegionStats| {
e.finished_fast_add_peer_count
.fetch_add(1, Ordering::SeqCst);
},
);
return r;
}
}
}
}
error!("recover from remote peer: failed after retry"; "region_id" => region_id);
(*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| {
e.finished_fast_add_peer_count
.fetch_add(1, Ordering::SeqCst);
});
failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)
}
96 changes: 96 additions & 0 deletions proxy_tests/proxy/shared/fast_add_peer/fp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,3 +944,99 @@ fn test_single_replica_migrate() {
fail::remove("on_pre_write_apply_state");
cluster.shutdown();
}

// Test MsgSnapshot before MsgAppend
#[test]
fn test_msgsnapshot_before_msgappend() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2);
pd_client.disable_default_operator();
fail::cfg("post_apply_snapshot_allow_no_unips", "return").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;

tikv_util::set_panic_hook(true, "./");
// Can always apply snapshot immediately
fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
fail::cfg("on_pre_write_apply_state", "return").unwrap();

let _ = cluster.run_conf_change();

cluster.must_put(b"k1", b"v1");
check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1]));
cluster.must_put(b"k2", b"v2");

// cluster.add_send_filter(CloneFilterFactory(
// RegionPacketFilter::new(1, 2)
// .msg_type(MessageType::MsgAppend)
// .direction(Direction::Recv),
// ));
fail::cfg("fap_mock_force_wait_for_data", "return(1)").unwrap();
pd_client.must_add_peer(1, new_learner_peer(2, 2));

std::thread::sleep(Duration::from_secs(1));

let region = cluster.get_region("k1".as_bytes());
let prev_state = maybe_collect_states(&cluster.cluster_ext, 1, Some(vec![1]));
let (compact_index, compact_term) = get_valid_compact_index(&prev_state);
debug!("compact at index {}", compact_index);
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
let req = test_raftstore::new_admin_request(1, region.get_region_epoch(), compact_log);
let res = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();

let mut t = 0;
while true {
let mut buf = Vec::<raft::eraftpb::Entry>::new();
cluster
.get_engines(1)
.raft
.get_all_entries_to(1, &mut buf)
.unwrap();
if buf.len() == 1 {
break;
}
std::thread::sleep(std::time::Duration::from_secs(1));
t += 1;
assert!(t < 11);
}

fail::remove("fap_mock_force_wait_for_data");
cluster.clear_send_filters();

pd_client.must_add_peer(1, new_learner_peer(2, 2));

iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
let mut x: u64 = 0;
let mut y: u64 = 0;
(*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| {
x = e.finished_fast_add_peer_count.load(Ordering::SeqCst);
});
(*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| {
y = e.started_fast_add_peers.lock().unwrap().len() as u64;
});
assert_eq!(x, y);
});

// FAP will fail for "can't find entry for index 9 of region 1".
check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2]));
must_wait_until_cond_node(
&cluster.cluster_ext,
1,
Some(vec![2]),
&|states: &States| -> bool {
find_peer_by_id(states.in_disk_region_state.get_region(), 2).is_some()
},
);

iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
assert_eq!(
ffi.engine_store_server_helper
.query_fap_snapshot_state(1, 2, 0, 0),
proxy_ffi::interfaces_ffi::FapSnapshotState::NotFound
);
});

fail::remove("on_can_apply_snapshot");
fail::remove("on_pre_write_apply_state");
cluster.shutdown();
}
6 changes: 2 additions & 4 deletions proxy_tests/proxy/shared/pprof_jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::path::Path;

use tempfile::NamedTempFile;

use crate::utils::v1::*;

#[test]
fn test_adhoc_dump_prof() {
use proxy_server::status_server::vendored_utils::{
Expand All @@ -17,8 +15,8 @@ fn test_adhoc_dump_prof() {
let _ = activate_prof();
}

let x = vec![1; 1000];
let y = vec![1; 1000];
let _x = vec![1; 1000];
let _y = vec![1; 1000];

let f = NamedTempFile::new().unwrap();
let path = f.path().to_str().unwrap();
Expand Down
29 changes: 6 additions & 23 deletions proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use engine_store_ffi::ffi::{
ProtoMsgBaseBuff,
};

use crate::{shared::ffi, utils::v1::*};
use crate::utils::v1::*;

#[derive(Default)]
struct GcMonitor {
Expand Down Expand Up @@ -230,7 +230,7 @@ fn test_read_index_applying() {
cluster.must_put(b"k0", b"v");
{
let prev_state = maybe_collect_states(&cluster.cluster_ext, r1, Some(vec![1]));
let (compact_index, compact_term) = get_valid_compact_index_by(&prev_state, Some(vec![1]));
let _ = get_valid_compact_index_by(&prev_state, Some(vec![1]));
}
cluster.pd_client.must_none_pending_peer(p2.clone());
// assert_eq!(cluster.pd_client.get_pending_peers().len(), 0);
Expand Down Expand Up @@ -360,22 +360,21 @@ fn test_util() {
use kvproto::{
kvrpcpb::{Context, DiskFullOpt, KeyRange},
raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest},
raft_serverpb::RaftMessage,
};
use raftstore::{
router::RaftStoreRouter,
store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext},
};
use tokio::sync::oneshot;
use txn_types::{Key, Lock, LockType, TimeStamp};
use txn_types::{Key, Lock, LockType};
use uuid::Uuid;

use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient};
use crate::utils::v1_server::new_server_cluster;

// https://github.com/tikv/tikv/issues/16823
#[test]
fn test_raft_cmd_request_cant_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};
use kvproto::kvrpcpb::ReadIndexRequest;

let mut cluster = new_server_cluster(0, 1);
cluster.run();
Expand All @@ -384,10 +383,6 @@ fn test_raft_cmd_request_cant_advanve_max_ts() {

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);

let mut ctx = Context::default();
let region_id = leader.get_id();
Expand Down Expand Up @@ -468,7 +463,6 @@ fn test_raft_cmd_request_learner_advanve_max_ts() {
let region = cluster.get_region(b"");
assert_eq!(region_id, 1);
assert_eq!(region.get_id(), 1);
let leader = region.get_peers()[0].clone();

fail::cfg("on_pre_write_apply_state", "return(true)").unwrap();
let learner = new_learner_peer(2, 2);
Expand All @@ -493,11 +487,6 @@ fn test_raft_cmd_request_learner_advanve_max_ts() {
);
guards[0].with_lock(|l| *l = Some(lock.clone()));

let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);

// cluster.must_put(b"k", b"v");

let read_index = |ranges: &[(&[u8], &[u8])]| {
Expand Down Expand Up @@ -584,12 +573,6 @@ fn test_raft_message_can_advanve_max_ts() {
let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let follower = new_learner_peer(2, 2);
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);

let region_id = leader.get_id();

let read_index = |ranges: &[(&[u8], &[u8])]| {
let mut m = raft::eraftpb::Message::default();
Expand Down Expand Up @@ -627,7 +610,7 @@ fn test_raft_message_can_advanve_max_ts() {

// wait a while until the node updates its own max ts
let prev_cm_max_ts = cm.max_ts();
let (resp, start_ts) = read_index(&[(b"l", b"yz")]);
let (_, start_ts) = read_index(&[(b"l", b"yz")]);
cluster.must_put(b"a", b"b");
std::thread::sleep(Duration::from_millis(2000));
// assert!(!resp.has_locked());
Expand Down

0 comments on commit 48da47f

Please sign in to comment.