-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
check raftcmd term. #1328
check raftcmd term. #1328
Conversation
@@ -298,7 +298,10 @@ impl StoreHandler { | |||
resp.set_cmd_gc_resp(gc); | |||
} | |||
|
|||
pub fn on_request(&self, req: Request, on_resp: OnResponse) -> Result<()> { | |||
pub fn on_request(&self, mut req: Request, on_resp: OnResponse) -> Result<()> { | |||
// Go client may set it to 0 if using gogoprotobuf. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why go client needs term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context is sent from client.
Ready to review. |
pub fn on_request(&self, req: Request, on_resp: OnResponse) -> Result<()> { | ||
pub fn on_request(&self, mut req: Request, on_resp: OnResponse) -> Result<()> { | ||
// Go client may set it to 0 if using gogoprotobuf. | ||
req.mut_context().clear_term(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why clear term here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gogoprotobuf will always set it, then we cannot use has_term()
. See pingcap/kvproto#97
@@ -95,7 +95,7 @@ pub struct RaftKv<S: RaftStoreRouter + 'static> { | |||
|
|||
enum CmdRes { | |||
Resp(Vec<Response>), | |||
Snap(RegionSnapshot), | |||
Snap((RegionSnapshot, u64)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer using a field structure instead of tuple here.
@@ -237,17 +240,23 @@ impl<S: RaftStoreRouter> Engine for RaftKv<S> { | |||
ASYNC_REQUESTS_COUNTER_VEC.with_label_values(&["snapshot", "all"]).inc(); | |||
let req_timer = ASYNC_REQUESTS_DURATIONS_VEC.with_label_values(&["snapshot"]).start_timer(); | |||
|
|||
let mut ctx2 = ctx.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does clone hurt the performance?
@siddontang PTAL. I've add |
let ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap(); | ||
assert_eq!(ctx.cid, cid); | ||
ctx.cmd.take().unwrap() | ||
}; | ||
if let Some(term) = cb_ctx.term { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem you can use cmd.set_context(cb_ctx) directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CbContext
is newly defined which is not the same type as Context
.
@@ -147,7 +147,7 @@ impl BatchRunnable<Task> for Host { | |||
let id = self.last_req_id; | |||
let sched = self.sched.clone(); | |||
if let Err(e) = self.engine.async_snapshot(reqs[0].req.get_context(), | |||
box move |res| { | |||
box move |(_cb_ctx, res)| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use _
directly.
@@ -1100,6 +1100,12 @@ impl<T: Transport, C: PdClient> Store<T, C> { | |||
return Err(box_err!("mismatch peer id {} != {}", peer.peer_id(), peer_id)); | |||
} | |||
|
|||
let header = msg.get_header(); | |||
// If header's term is 2 verions behind current term, leadership may have been changed away. | |||
if header.has_term() && peer.term() > header.get_term() + 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use get_term() != 0 && ...
instead.
pub type Callback<T> = Box<FnBox(Result<T>) + Send>; | ||
pub type Callback<T> = Box<FnBox((CbContext, Result<T>)) + Send>; | ||
|
||
pub struct CbContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why define another CbContext
? It change the API a lot. We have many context already, add another one just makes things complicated and confusing. I think it's ugly and unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, old API does not support underlying engine to return more data other than Result
, we need to change it no matter we reuse Context
or not.
In addition, the content of the two Context are different, and subsequent changes will make them more different. I see no reason to reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not saying reuse the Context
. You can check the Result
to get the actual term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, my next PR need it return extra data no matter the result is Ok or Err.
PTAL @siddontang @BusyJay |
LGTM |
@@ -1100,6 +1100,12 @@ impl<T: Transport, C: PdClient> Store<T, C> { | |||
return Err(box_err!("mismatch peer id {} != {}", peer.peer_id(), peer_id)); | |||
} | |||
|
|||
let header = msg.get_header(); | |||
// If header's term is 2 verions behind current term, leadership may have been changed away. | |||
if header.get_term() > 0 && peer.term() > header.get_term() + 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use peer.term() != hader.get_term()
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
term may increase without leader changed, we have already checked it is leader, so this case is okay.
LGTM |
# Conflicts: # Cargo.lock
let leader = self.get_peer_from_cache(self.leader_id()); | ||
let not_leader = Error::NotLeader(self.region_id, leader); | ||
let resp = cmd_resp::err_resp(not_leader, cmd.uuid, self.term()); | ||
/// Call the callback of `cmd` that is stale. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain in the comment what does stale mean.
@@ -33,7 +33,17 @@ pub const TEMP_DIR: &'static str = ""; | |||
const SEEK_BOUND: usize = 30; | |||
const DEFAULT_TIMEOUT_SECS: u64 = 5; | |||
|
|||
pub type Callback<T> = Box<FnBox(Result<T>) + Send>; | |||
pub type Callback<T> = Box<FnBox((CbContext, Result<T>)) + Send>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use tuple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_op!()
will be broken if not use tuple.
@@ -149,6 +155,9 @@ impl<S: RaftStoreRouter> RaftKv<S> { | |||
header.set_region_epoch(ctx.get_region_epoch().clone()); | |||
header.set_uuid(Uuid::new_v4().as_bytes().to_vec()); | |||
header.set_read_quorum(ctx.get_read_quorum()); | |||
if ctx.has_term() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use get_term() != 0
instead.
assert!(engine.write(&ctx, vec![]).is_err()); | ||
// Term not match. | ||
cluster.must_transfer_leader(region.get_id(), peers[0].clone()); | ||
assert!(engine.write(&ctx, vec![]).is_err()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you know the error due to term not matched?
PTAL @BusyJay |
cluster.must_transfer_leader(region.get_id(), peers[0].clone()); | ||
block.store(false, Ordering::SeqCst); | ||
|
||
rx.recv().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use recv_timeout
instead.
@@ -114,6 +114,9 @@ quick_error!{ | |||
description("region is stale") | |||
display("StaleEpoch {}", msg) | |||
} | |||
StaleCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will client do when encounter StaleCommand
? Will it backoff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Backoff then retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think backoff is necessary here, because the new leader should be elected, it can retry immediately.
LGTM |
Fix #1317
Still working on detailed test.