Skip to content

Commit

Permalink
made log async
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Gardham-Pallister committed May 2, 2018
1 parent f31b8fd commit fc66e5a
Show file tree
Hide file tree
Showing 14 changed files with 380 additions and 394 deletions.
20 changes: 10 additions & 10 deletions src/Rafty/Concensus/States/Candidate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,18 @@ public async Task<AppendEntriesResponse> Handle(AppendEntries appendEntries)
return response.appendEntriesResponse;
}

response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState);
response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState);

if(response.shouldReturn)
{
return response.appendEntriesResponse;
}

_rules.DeleteAnyConflictsInLog(appendEntries, _log);
await _rules.DeleteAnyConflictsInLog(appendEntries, _log);

_rules.ApplyEntriesToLog(appendEntries, _log);
await _rules.ApplyEntriesToLog(appendEntries, _log);

var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);
var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);

await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied);

Expand Down Expand Up @@ -129,7 +129,7 @@ public async Task<RequestVoteResponse> Handle(RequestVote requestVote)
return response.requestVoteResponse;
}

response = LastLogIndexAndLastLogTermMatchesThis(requestVote);
response = await LastLogIndexAndLastLogTermMatchesThis(requestVote);

if(response.shouldReturn)
{
Expand Down Expand Up @@ -234,7 +234,7 @@ private void ShouldBecomeLeader()

private async Task RequestVote(IPeer peer, BlockingCollection<RequestVoteResponse> requestVoteResponses)
{
var requestVoteResponse = await peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm));
var requestVoteResponse = await peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm()));
requestVoteResponses.Add(requestVoteResponse);
}

Expand Down Expand Up @@ -271,7 +271,7 @@ private async Task ApplyToStateMachine(int commitIndex, int lastApplied)
while (commitIndex > lastApplied)
{
lastApplied++;
var log = _log.Get(lastApplied);
var log = await _log.Get(lastApplied);
await _fsm.Handle(log);
}

Expand Down Expand Up @@ -303,10 +303,10 @@ private void AppendEntriesTermIsGreaterThanCurrentTerm(AppendEntries appendEntri
return (null, false);
}

private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote)
private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote)
{
if (requestVote.LastLogIndex == _log.LastLogIndex &&
requestVote.LastLogTerm == _log.LastLogTerm)
if (requestVote.LastLogIndex == await _log.LastLogIndex() &&
requestVote.LastLogTerm == await _log.LastLogTerm())
{
CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId);
BecomeFollower();
Expand Down
18 changes: 9 additions & 9 deletions src/Rafty/Concensus/States/Follower.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ public async Task<AppendEntriesResponse> Handle(AppendEntries appendEntries)
return response.appendEntriesResponse;
}

response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState);
response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState);

if(response.shouldReturn)
{
return response.appendEntriesResponse;
}

_rules.DeleteAnyConflictsInLog(appendEntries, _log);
await _rules.DeleteAnyConflictsInLog(appendEntries, _log);

_rules.ApplyEntriesToLog(appendEntries, _log);
await _rules.ApplyEntriesToLog(appendEntries, _log);

var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);
var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);

await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied, appendEntries);

Expand Down Expand Up @@ -100,7 +100,7 @@ public async Task<RequestVoteResponse> Handle(RequestVote requestVote)
return response.requestVoteResponse;
}

response = LastLogIndexAndLastLogTermMatchesThis(requestVote);
response = await LastLogIndexAndLastLogTermMatchesThis(requestVote);

_messagesSinceLastElectionExpiry++;

Expand Down Expand Up @@ -140,10 +140,10 @@ public void Stop()
return (null, false);
}

private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote)
private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote)
{
if (requestVote.LastLogIndex == _log.LastLogIndex &&
requestVote.LastLogTerm == _log.LastLogTerm)
if (requestVote.LastLogIndex == await _log.LastLogIndex() &&
requestVote.LastLogTerm == await _log.LastLogTerm())
{
CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId);

Expand All @@ -158,7 +158,7 @@ private async Task ApplyToStateMachine(int commitIndex, int lastApplied, AppendE
while (commitIndex > lastApplied)
{
lastApplied++;
var log = _log.Get(lastApplied);
var log = await _log.Get(lastApplied);
await _fsm.Handle(log);
}

Expand Down
54 changes: 27 additions & 27 deletions src/Rafty/Concensus/States/Leader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public void Stop()

public async Task<Response<T>> Accept<T>(T command) where T : ICommand
{
var indexOfCommand = AddCommandToLog(command);
var indexOfCommand = await AddCommandToLog(command);

var peers = _getPeers(CurrentState);

if(No(peers))
{
var log = _log.Get(indexOfCommand);
var log = await _log.Get(indexOfCommand);
await ApplyToStateMachineAndUpdateCommitIndex(log);
return Ok(command);
}
Expand All @@ -76,7 +76,7 @@ public async Task<AppendEntriesResponse> Handle(AppendEntries appendEntries)
{
if (appendEntries.Term > CurrentState.CurrentTerm)
{
var response = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);
var response = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);

await ApplyToStateMachine(appendEntries, response.commitIndex, response.lastApplied);

Expand Down Expand Up @@ -111,7 +111,7 @@ private ConcurrentBag<AppendEntriesResponse> SetUpAppendingEntries()

private async Task<AppendEntriesResponse> GetAppendEntriesResponse(PeerState p, List<(int, LogEntry logEntry)> logsToSend)
{
var appendEntriesResponse = await p.Peer.Request(new AppendEntries(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm, logsToSend.Select(x => x.logEntry).ToList(), CurrentState.CommitIndex));
var appendEntriesResponse = await p.Peer.Request(new AppendEntries(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm(), logsToSend.Select(x => x.logEntry).ToList(), CurrentState.CommitIndex));
return appendEntriesResponse;
}

Expand Down Expand Up @@ -145,7 +145,7 @@ private void UpdateIndexes(PeerState peer, List<(int index, LogEntry logEntry)>
}
}

private void SendAppendEntries()
private async Task SendAppendEntries()
{
if(_appendingEntries == true)
{
Expand All @@ -166,9 +166,9 @@ private void SendAppendEntries()
{
var peersNotInPeerStates = peers.Where(p => !PeerStates.Select(x => x.Peer.Id).Contains(p.Id)).ToList();

peersNotInPeerStates.ForEach(p => {
peersNotInPeerStates.ForEach(async p => {
var matchIndex = new MatchIndex(p, 0);
var nextIndex = new NextIndex(p, _log.LastLogIndex);
var nextIndex = new NextIndex(p, await _log.LastLogIndex());
PeerStates.Add(new PeerState(p, matchIndex, nextIndex));
});
}
Expand All @@ -177,7 +177,7 @@ private void SendAppendEntries()

async Task Do(PeerState peer)
{
var logsToSend = GetLogsForPeer(peer.NextIndex);
var logsToSend = await GetLogsForPeer(peer.NextIndex);

var appendEntriesResponse = await GetAppendEntriesResponse(peer, logsToSend);

Expand All @@ -199,7 +199,7 @@ async Task Do(PeerState peer)
return;
}

UpdateCommitIndex();
await UpdateCommitIndex();
_appendingEntries = false;
}

Expand All @@ -216,15 +216,15 @@ async Task Do(PeerState peer)
return (false, 0);
}

private void UpdateCommitIndex()
private async Task UpdateCommitIndex()
{
var nextCommitIndex = CurrentState.CommitIndex + 1;
var statesIndexOfHighestKnownReplicatedLogs = PeerStates.Select(x => x.MatchIndex.IndexOfHighestKnownReplicatedLog).ToList();
var greaterOrEqualToN = statesIndexOfHighestKnownReplicatedLogs.Where(x => x >= nextCommitIndex).ToList();
var lessThanN = statesIndexOfHighestKnownReplicatedLogs.Where(x => x < nextCommitIndex).ToList();
if (greaterOrEqualToN.Count > lessThanN.Count)
{
if (_log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm)
if (await _log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm)
{
CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm,
CurrentState.VotedFor, nextCommitIndex, CurrentState.LastApplied, CurrentState.LeaderId);
Expand All @@ -235,9 +235,9 @@ private void UpdateCommitIndex()
private void ResetElectionTimer()
{
_electionTimer?.Dispose();
_electionTimer = new Timer(x =>
_electionTimer = new Timer(async x =>
{
SendAppendEntries();
await SendAppendEntries();

}, null, 0, Convert.ToInt32(_settings.HeartbeatTimeout));
}
Expand All @@ -246,17 +246,17 @@ private void InitialisePeerStates()
{
PeerStates = new List<PeerState>();
var peers = _getPeers(CurrentState);
peers.ForEach(p => {
peers.ForEach(async p => {
var matchIndex = new MatchIndex(p, 0);
var nextIndex = new NextIndex(p, _log.LastLogIndex);
var nextIndex = new NextIndex(p, await _log.LastLogIndex());
PeerStates.Add(new PeerState(p, matchIndex, nextIndex));
});
}

private int AddCommandToLog<T>(T command) where T : ICommand
private async Task<int> AddCommandToLog<T>(T command) where T : ICommand
{
var log = new LogEntry(command, command.GetType(), CurrentState.CurrentTerm);
var index = _log.Apply(log);
var index = await _log.Apply(log);
return index;
}

Expand Down Expand Up @@ -293,13 +293,13 @@ private void Wait()
Thread.Sleep(_settings.HeartbeatTimeout);
}

private List<(int index ,LogEntry logEntry)> GetLogsForPeer(NextIndex nextIndex)
private async Task<List<(int index ,LogEntry logEntry)>> GetLogsForPeer(NextIndex nextIndex)
{
if (_log.Count > 0)
if (await _log.Count() > 0)
{
if (_log.LastLogIndex >= nextIndex.NextLogIndexToSendToPeer)
if (await _log.LastLogIndex() >= nextIndex.NextLogIndexToSendToPeer)
{
var logs = _log.GetFrom(nextIndex.NextLogIndexToSendToPeer);
var logs = await _log.GetFrom(nextIndex.NextLogIndexToSendToPeer);
return logs;
}
}
Expand All @@ -325,7 +325,7 @@ private async Task ApplyToStateMachine(AppendEntries appendEntries, int commitIn
while (commitIndex > lastApplied)
{
lastApplied++;
var log = _log.Get(lastApplied);
var log = await _log.Get(lastApplied);
await _fsm.Handle(log);
}

Expand All @@ -351,7 +351,7 @@ private bool No(List<IPeer> peers)
private async Task ApplyToStateMachineAndUpdateCommitIndex(LogEntry log)
{
var nextCommitIndex = CurrentState.CommitIndex + 1;
if (_log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm)
if (await _log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm)
{
CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm,
CurrentState.VotedFor, nextCommitIndex, CurrentState.LastApplied, CurrentState.LeaderId);
Expand All @@ -365,10 +365,10 @@ private OkResponse<T> Ok<T>(T command)
return new OkResponse<T>(command);
}

private ErrorResponse<T> UnableDueToTimeout<T>(T command, int indexOfCommand)
private async Task<ErrorResponse<T>> UnableDueToTimeout<T>(T command, int indexOfCommand)
{
DecrementIndexesOfAnyPeersCommandReplicatedTo(indexOfCommand);
_log.Remove(indexOfCommand);
await _log.Remove(indexOfCommand);
_appendingEntries = false;
return new ErrorResponse<T>("Unable to replicate command to peers due to timeout.", command);
}
Expand All @@ -392,7 +392,7 @@ private async Task<Response<T>> Replicate<T>(T command, int indexOfCommand)
{
if(ReplicationTimeout())
{
return UnableDueToTimeout(command, indexOfCommand);
return await UnableDueToTimeout(command, indexOfCommand);
}

var replicated = 0;
Expand All @@ -406,7 +406,7 @@ private async Task<Response<T>> Replicate<T>(T command, int indexOfCommand)

if (ReplicatedToMajority(replicated))
{
var log = _log.Get(indexOfCommand);
var log = await _log.Get(indexOfCommand);
await _fsm.Handle(log);
FinishWaitingForCommandToReplicate();
break;
Expand Down
25 changes: 13 additions & 12 deletions src/Rafty/Concensus/States/Rules.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using System;
using System.Threading.Tasks;
using Rafty.Log;

namespace Rafty.Concensus.States
{
public interface IRules
{
(AppendEntriesResponse appendEntriesResponse, bool shouldReturn) AppendEntriesTermIsLessThanCurrentTerm(AppendEntries appendEntries, CurrentState currentState);
(AppendEntriesResponse appendEntriesResponse, bool shouldReturn) LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState);
void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log);
void ApplyEntriesToLog(AppendEntries appendEntries, ILog log);
(int commitIndex, int lastApplied) CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState);
Task<(AppendEntriesResponse appendEntriesResponse, bool shouldReturn)> LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState);
Task DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log);
Task ApplyEntriesToLog(AppendEntries appendEntries, ILog log);
Task<(int commitIndex, int lastApplied)> CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState);
(RequestVoteResponse requestVoteResponse, bool shouldReturn) RequestVoteTermIsLessThanCurrentTerm(RequestVote requestVote, CurrentState currentState);
(RequestVoteResponse requestVoteResponse, bool shouldReturn) VotedForIsNotThisOrNobody(RequestVote requestVote, CurrentState currentState);
}
Expand Down Expand Up @@ -39,34 +40,34 @@ public class Rules : IRules
}

// todo - inject as function into candidate and follower as logic is the same...
public (int commitIndex, int lastApplied) CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState)
public async Task<(int commitIndex, int lastApplied)> CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState)
{
var commitIndex = currentState.CommitIndex;
var lastApplied = currentState.LastApplied;
if (appendEntries.LeaderCommitIndex > currentState.CommitIndex)
{
var lastNewEntry = log.LastLogIndex;
var lastNewEntry = await log.LastLogIndex();
commitIndex = System.Math.Min(appendEntries.LeaderCommitIndex, lastNewEntry);
}

return (commitIndex, lastApplied);
}
// todo - inject as function into candidate and follower as logic is the same...
public void ApplyEntriesToLog(AppendEntries appendEntries, ILog log)
public async Task ApplyEntriesToLog(AppendEntries appendEntries, ILog log)
{
foreach (var entry in appendEntries.Entries)
{
log.Apply(entry);
await log.Apply(entry);
}
}

// todo - inject as function into candidate and follower as logic is the same...
public void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log)
public async Task DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log)
{
var count = 1;
foreach (var newLog in appendEntries.Entries)
{
log.DeleteConflictsFromThisLog(appendEntries.PreviousLogIndex + 1, newLog);
await log.DeleteConflictsFromThisLog(appendEntries.PreviousLogIndex + 1, newLog);
count++;
}
}
Expand All @@ -83,9 +84,9 @@ public void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log)
}

// todo - inject as function into candidate and follower as logic is the same...
public (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState)
public async Task<(AppendEntriesResponse appendEntriesResponse, bool shouldReturn)> LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState)
{
var termAtPreviousLogIndex = log.GetTermAtIndex(appendEntries.PreviousLogIndex);
var termAtPreviousLogIndex = await log.GetTermAtIndex(appendEntries.PreviousLogIndex);
if (termAtPreviousLogIndex > 0 && termAtPreviousLogIndex != appendEntries.PreviousLogTerm)
{
return (new AppendEntriesResponse(currentState.CurrentTerm, false), true);
Expand Down
Loading

0 comments on commit fc66e5a

Please sign in to comment.