Skip to content

Commit

Permalink
r/vote_stm: guard from concurrent elections
Browse files Browse the repository at this point in the history
Use a lock held by critical part of the voting process,
released when decision is made
  • Loading branch information
bashtanov committed Jul 16, 2024
1 parent dc78dfd commit 2223844
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 69 deletions.
15 changes: 11 additions & 4 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ class consensus {
bool _transferring_leadership{false};

/// useful for when we are not the leader
clock_type::time_point _hbeat = clock_type::now();
clock_type::time_point _hbeat = clock_type::now(); // is max() iff leader
clock_type::time_point _became_leader_at = clock_type::now();
clock_type::time_point _instantiated_at = clock_type::now();

Expand All @@ -845,15 +845,22 @@ class consensus {
/// used to wait for background ops before shutting down
ss::gate _bg;

/**
* Locks listed in the order of nestedness, election being the outermost
* and snapshot the innermost. I.e. if any of these locks are used at the
* same time, they should be acquired in the listed order and released in
* reverse order.
*/
/// guards from concurrent election where this instance is a candidate
mutex _election_lock{"consensus::election_lock"};
/// all raft operations must happen exclusively since the common case
/// is for the operation to touch the disk
mutex _op_lock{"consensus::op_lock"};
/// since snapshot state is orthogonal to raft state when writing snapshot
/// it is enough to grab the snapshot mutex, there is no need to keep
/// oplock, if the two locks are expected to be acquired at the same time
/// the snapshot lock should always be an internal (taken after the
/// _op_lock)
/// oplock
mutex _snapshot_lock{"consensus::snapshot_lock"};

/// used for notifying when commits happened to log
event_manager _event_manager;
std::unique_ptr<probe> _probe;
Expand Down
133 changes: 68 additions & 65 deletions src/v/raft/vote_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,71 +126,74 @@ ss::future<election_success> vote_stm::vote(bool leadership_transfer) {
proceed_with_election,
immediate_success,
};
return _ptr->_op_lock
.with([this, leadership_transfer] {
_config = _ptr->config();
// check again while under op_sem
if (_ptr->should_skip_vote(leadership_transfer)) {
return ss::make_ready_future<prepare_election_result>(
prepare_election_result::skip_election);
}
// 5.2.1 mark node as candidate, and update leader id
_ptr->_vstate = consensus::vote_state::candidate;
// only trigger notification when we had a leader previously
if (_ptr->_leader_id) {
_ptr->_leader_id = std::nullopt;
_ptr->trigger_leadership_notification();
}

if (_prevote && leadership_transfer) {
return ssx::now(prepare_election_result::immediate_success);
}

// 5.2.1.2
/**
* Pre-voting doesn't increase the term
*/
if (!_prevote) {
_ptr->_term += model::term_id(1);
_ptr->_voted_for = {};
}

// special case, it may happen that node requesting votes is not a
// voter, it may happen if it is a learner in previous configuration
_replies.emplace(_ptr->_self, *this);

// vote is the only method under _op_sem
_config->for_each_voter(
[this](vnode id) { _replies.emplace(id, *this); });

auto lstats = _ptr->_log->offsets();
auto last_entry_term = _ptr->get_last_entry_term(lstats);

_req = vote_request{
.node_id = _ptr->_self,
.group = _ptr->group(),
.term = _ptr->term(),
.prev_log_index = lstats.dirty_offset,
.prev_log_term = last_entry_term,
.leadership_transfer = leadership_transfer};
// we have to self vote before dispatching vote request to
// other nodes, this vote has to be done under op semaphore as
// it changes voted_for state
return self_vote().then(
[] { return prepare_election_result::proceed_with_election; });
})
.then([this](prepare_election_result result) {
switch (result) {
case prepare_election_result::skip_election:
return ss::make_ready_future<election_success>(
election_success::no);
case prepare_election_result::proceed_with_election:
return do_vote();
case prepare_election_result::immediate_success:
return ss::make_ready_future<election_success>(
election_success::yes);
}
});
return _ptr->_election_lock.with([this, leadership_transfer] {
return _ptr->_op_lock
.with([this, leadership_transfer] {
_config = _ptr->config();
// check again while under op_sem
if (_ptr->should_skip_vote(leadership_transfer)) {
return ss::make_ready_future<prepare_election_result>(
prepare_election_result::skip_election);
}
// 5.2.1 mark node as candidate, and update leader id
_ptr->_vstate = consensus::vote_state::candidate;
// only trigger notification when we had a leader previously
if (_ptr->_leader_id) {
_ptr->_leader_id = std::nullopt;
_ptr->trigger_leadership_notification();
}

if (_prevote && leadership_transfer) {
return ssx::now(prepare_election_result::immediate_success);
}

// 5.2.1.2
/**
* Pre-voting doesn't increase the term
*/
if (!_prevote) {
_ptr->_term += model::term_id(1);
_ptr->_voted_for = {};
}

// special case, it may happen that node requesting votes is not a
// voter, it may happen if it is a learner in previous
// configuration
_replies.emplace(_ptr->_self, *this);

// vote is the only method under _op_sem
_config->for_each_voter(
[this](vnode id) { _replies.emplace(id, *this); });

auto lstats = _ptr->_log->offsets();
auto last_entry_term = _ptr->get_last_entry_term(lstats);

_req = vote_request{
.node_id = _ptr->_self,
.group = _ptr->group(),
.term = _ptr->term(),
.prev_log_index = lstats.dirty_offset,
.prev_log_term = last_entry_term,
.leadership_transfer = leadership_transfer};
// we have to self vote before dispatching vote request to
// other nodes, this vote has to be done under op semaphore as
// it changes voted_for state
return self_vote().then(
[] { return prepare_election_result::proceed_with_election; });
})
.then([this](prepare_election_result result) {
switch (result) {
case prepare_election_result::skip_election:
return ss::make_ready_future<election_success>(
election_success::no);
case prepare_election_result::proceed_with_election:
return do_vote();
case prepare_election_result::immediate_success:
return ss::make_ready_future<election_success>(
election_success::yes);
}
});
});
}

ss::future<election_success> vote_stm::do_vote() {
Expand Down

0 comments on commit 2223844

Please sign in to comment.