diff --git a/README.md b/README.md index 24f4207..faeedaa 100644 --- a/README.md +++ b/README.md @@ -39,12 +39,14 @@ You must implement ISettings which provides a description of each member in the Rafty provides some in memory implementations of its interfaces (you shouldn't use these for anything serious). ```csharp + var log = new InMemoryLog(); var fsm = new InMemoryStateMachine(); var settings = new InMemorySettings(1000, 3500, 50, 5000); var peersProvider = new InMemoryPeersProvider(_peers); var node = new Node(fsm, log, settings, peersProvider); node.Start(); + ``` The above code will get a Rafty node up and running. If the IPeersProvider does not return any IPeers then it will elect itself leader and just run along happily. If something joins the cluster later it will update that new node as the node will get a heartbeat before it can elect itself. Or an election will start! @@ -54,12 +56,14 @@ So in order to get Rafty really running the IPeerProvider needs to return peers. Finally you need to expose the INode interface to some kind of HTTP. I would advise just a plain old .net core web api type thing. These are the methods you need to expose and the transport in your IPeer should hit these URLS (hope that makes some sense). You can look at NodePeer to see how I do this in memory. ```csharp -AppendEntriesResponse Request(AppendEntries appendEntries); -RequestVoteResponse Request(RequestVote requestVote); -Response Request(T command); + +Task Request(AppendEntries appendEntries); +Task Request(RequestVote requestVote); +Task> Request(T command); + ``` -## Further help.. +## Further help The Acceptance and Integration tests will be helpful for anyone who wants to use Rafty. diff --git a/src/Rafty/Concensus/Node/INode.cs b/src/Rafty/Concensus/Node/INode.cs index 535c4fd..c5c889f 100644 --- a/src/Rafty/Concensus/Node/INode.cs +++ b/src/Rafty/Concensus/Node/INode.cs @@ -3,16 +3,18 @@ namespace Rafty.Concensus { + using System.Threading.Tasks; + public interface INode { IState State { get; } void BecomeLeader(CurrentState state); void BecomeFollower(CurrentState state); void BecomeCandidate(CurrentState state); - AppendEntriesResponse Handle(AppendEntries appendEntries); - RequestVoteResponse Handle(RequestVote requestVote); + Task Handle(AppendEntries appendEntries); + Task Handle(RequestVote requestVote); void Start(string id); void Stop(); - Response Accept(T command) where T : ICommand; + Task> Accept(T command) where T : ICommand; } } \ No newline at end of file diff --git a/src/Rafty/Concensus/Node/Node.cs b/src/Rafty/Concensus/Node/Node.cs index d56cc49..0bacd50 100644 --- a/src/Rafty/Concensus/Node/Node.cs +++ b/src/Rafty/Concensus/Node/Node.cs @@ -8,6 +8,8 @@ namespace Rafty.Concensus { + using System.Threading.Tasks; + public class Node : INode { private readonly IFiniteStateMachine _fsm; @@ -72,20 +74,21 @@ public void BecomeFollower(CurrentState state) State = new Follower(state, _fsm, _log, _random, this, _settings, _rules, _getPeers(state)); } - public AppendEntriesResponse Handle(AppendEntries appendEntries) + public async Task Handle(AppendEntries appendEntries) { - return State.Handle(appendEntries); + return await State.Handle(appendEntries); } - public RequestVoteResponse Handle(RequestVote requestVote) + public async Task Handle(RequestVote requestVote) { - return State.Handle(requestVote); + return await State.Handle(requestVote); } - public Response Accept(T command) where T : ICommand + public async Task> Accept(T command) where T : ICommand { - return State.Accept(command); + return await State.Accept(command); } + public void Stop() { State.Stop(); diff --git a/src/Rafty/Concensus/Node/NodePeer.cs b/src/Rafty/Concensus/Node/NodePeer.cs index 98f40d4..e560b7e 100644 --- a/src/Rafty/Concensus/Node/NodePeer.cs +++ b/src/Rafty/Concensus/Node/NodePeer.cs @@ -5,33 +5,24 @@ namespace Rafty.AcceptanceTests { + using System.Threading.Tasks; + public class NodePeer : IPeer { private Node _node; - public string Id - { - get - { - if(_node?.State?.CurrentState?.Id != null) - { - return _node.State.CurrentState.Id; - } - - return default(string); - } - } + public string Id => _node?.State?.CurrentState?.Id; public void SetNode (Node node) { _node = node; } - public RequestVoteResponse Request(RequestVote requestVote) + public async Task Request(RequestVote requestVote) { try { - return _node.Handle(requestVote); + return await _node.Handle(requestVote); } catch(Exception e) { @@ -39,11 +30,11 @@ public RequestVoteResponse Request(RequestVote requestVote) } } - public AppendEntriesResponse Request(AppendEntries appendEntries) + public async Task Request(AppendEntries appendEntries) { try { - return _node.Handle(appendEntries); + return await _node.Handle(appendEntries); } catch(Exception e) { @@ -51,11 +42,11 @@ public AppendEntriesResponse Request(AppendEntries appendEntries) } } - public Response Request(T command) where T : ICommand + public async Task> Request(T command) where T : ICommand { try { - return _node.Accept(command); + return await _node.Accept(command); } catch(Exception e) { diff --git a/src/Rafty/Concensus/Peers/IPeer.cs b/src/Rafty/Concensus/Peers/IPeer.cs index 63e5518..650450a 100644 --- a/src/Rafty/Concensus/Peers/IPeer.cs +++ b/src/Rafty/Concensus/Peers/IPeer.cs @@ -3,6 +3,8 @@ namespace Rafty.Concensus { + using System.Threading.Tasks; + public interface IPeer { /// @@ -13,16 +15,16 @@ public interface IPeer /// /// This will make a requestvote request to the given peer. You must implement the transport. /// - RequestVoteResponse Request(RequestVote requestVote); + Task Request(RequestVote requestVote); /// /// This will make a appendentries request to the given peer. You must implement the transport. /// - AppendEntriesResponse Request(AppendEntries appendEntries); + Task Request(AppendEntries appendEntries); /// /// This will make a command request ot the given peer. You must implement the transport. /// - Response Request(T command) where T : ICommand; + Task> Request(T command) where T : ICommand; } } \ No newline at end of file diff --git a/src/Rafty/Concensus/Peers/Peer.cs b/src/Rafty/Concensus/Peers/Peer.cs index 9cda50d..d414f4b 100644 --- a/src/Rafty/Concensus/Peers/Peer.cs +++ b/src/Rafty/Concensus/Peers/Peer.cs @@ -3,22 +3,23 @@ namespace Rafty.Concensus { using System; + using System.Threading.Tasks; public class Peer : IPeer { public string Id => throw new NotImplementedException(); - public RequestVoteResponse Request(RequestVote requestVote) + public Task Request(RequestVote requestVote) { throw new NotImplementedException(); } - public AppendEntriesResponse Request(AppendEntries appendEntries) + public Task Request(AppendEntries appendEntries) { throw new NotImplementedException(); } - public Response Request(T command) where T : ICommand + public Task> Request(T command) where T : ICommand { throw new NotImplementedException(); } diff --git a/src/Rafty/Concensus/States/Candidate.cs b/src/Rafty/Concensus/States/Candidate.cs index d94f3e6..2590a8e 100644 --- a/src/Rafty/Concensus/States/Candidate.cs +++ b/src/Rafty/Concensus/States/Candidate.cs @@ -75,7 +75,7 @@ public void BeginElection() BecomeFollower(); } - public AppendEntriesResponse Handle(AppendEntries appendEntries) + public async Task Handle(AppendEntries appendEntries) { var response = _rules.AppendEntriesTermIsLessThanCurrentTerm(appendEntries, CurrentState); @@ -84,20 +84,20 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries) return response.appendEntriesResponse; } - response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); + response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); if(response.shouldReturn) { return response.appendEntriesResponse; } - _rules.DeleteAnyConflictsInLog(appendEntries, _log); + await _rules.DeleteAnyConflictsInLog(appendEntries, _log); - _rules.ApplyEntriesToLog(appendEntries, _log); + await _rules.ApplyEntriesToLog(appendEntries, _log); - var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); + var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); - ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied); + await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied); SetLeaderId(appendEntries); @@ -106,7 +106,7 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries) return new AppendEntriesResponse(CurrentState.CurrentTerm, true); } - public RequestVoteResponse Handle(RequestVote requestVote) + public async Task Handle(RequestVote requestVote) { var response = RequestVoteTermIsGreaterThanCurrentTerm(requestVote); @@ -129,7 +129,7 @@ public RequestVoteResponse Handle(RequestVote requestVote) return response.requestVoteResponse; } - response = LastLogIndexAndLastLogTermMatchesThis(requestVote); + response = await LastLogIndexAndLastLogTermMatchesThis(requestVote); if(response.shouldReturn) { @@ -139,7 +139,7 @@ public RequestVoteResponse Handle(RequestVote requestVote) return new RequestVoteResponse(false, CurrentState.CurrentTerm); } - public Response Accept(T command) where T : ICommand + public async Task> Accept(T command) where T : ICommand { return new ErrorResponse("Please retry command later. Currently electing new a new leader.", command); } @@ -234,7 +234,7 @@ private void ShouldBecomeLeader() private async Task RequestVote(IPeer peer, BlockingCollection requestVoteResponses) { - var requestVoteResponse = peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm)); + var requestVoteResponse = await peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm())); requestVoteResponses.Add(requestVoteResponse); } @@ -266,13 +266,13 @@ private void ResetElectionTimer() }, null, Convert.ToInt32(timeout.TotalMilliseconds), Convert.ToInt32(timeout.TotalMilliseconds)); } - private void ApplyToStateMachine(int commitIndex, int lastApplied) + private async Task ApplyToStateMachine(int commitIndex, int lastApplied) { while (commitIndex > lastApplied) { lastApplied++; - var log = _log.Get(lastApplied); - _fsm.Handle(log); + var log = await _log.Get(lastApplied); + await _fsm.Handle(log); } CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, @@ -303,10 +303,10 @@ private void AppendEntriesTermIsGreaterThanCurrentTerm(AppendEntries appendEntri return (null, false); } - private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) + private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) { - if (requestVote.LastLogIndex == _log.LastLogIndex && - requestVote.LastLogTerm == _log.LastLogTerm) + if (requestVote.LastLogIndex == await _log.LastLogIndex() && + requestVote.LastLogTerm == await _log.LastLogTerm()) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); BecomeFollower(); diff --git a/src/Rafty/Concensus/States/Follower.cs b/src/Rafty/Concensus/States/Follower.cs index 5fe30d6..5a4f302 100644 --- a/src/Rafty/Concensus/States/Follower.cs +++ b/src/Rafty/Concensus/States/Follower.cs @@ -46,7 +46,7 @@ public Follower( public CurrentState CurrentState { get; private set;} - public AppendEntriesResponse Handle(AppendEntries appendEntries) + public async Task Handle(AppendEntries appendEntries) { var response = _rules.AppendEntriesTermIsLessThanCurrentTerm(appendEntries, CurrentState); @@ -55,20 +55,20 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries) return response.appendEntriesResponse; } - response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); + response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); if(response.shouldReturn) { return response.appendEntriesResponse; } - _rules.DeleteAnyConflictsInLog(appendEntries, _log); + await _rules.DeleteAnyConflictsInLog(appendEntries, _log); - _rules.ApplyEntriesToLog(appendEntries, _log); + await _rules.ApplyEntriesToLog(appendEntries, _log); - var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); + var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); - ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied, appendEntries); + await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied, appendEntries); SetLeaderId(appendEntries); @@ -77,7 +77,7 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries) return new AppendEntriesResponse(CurrentState.CurrentTerm, true); } - public RequestVoteResponse Handle(RequestVote requestVote) + public async Task Handle(RequestVote requestVote) { var response = RequestVoteTermIsGreaterThanCurrentTerm(requestVote); @@ -100,7 +100,7 @@ public RequestVoteResponse Handle(RequestVote requestVote) return response.requestVoteResponse; } - response = LastLogIndexAndLastLogTermMatchesThis(requestVote); + response = await LastLogIndexAndLastLogTermMatchesThis(requestVote); _messagesSinceLastElectionExpiry++; @@ -112,12 +112,12 @@ public RequestVoteResponse Handle(RequestVote requestVote) return new RequestVoteResponse(false, CurrentState.CurrentTerm); } - public Response Accept(T command) where T : ICommand + public async Task> Accept(T command) where T : ICommand { var leader = _peers.FirstOrDefault(x => x.Id == CurrentState.LeaderId); if(leader != null) { - return leader.Request(command); + return await leader.Request(command); } return new ErrorResponse("Please retry command later. Unable to find leader.", command); @@ -140,10 +140,10 @@ public void Stop() return (null, false); } - private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) + private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) { - if (requestVote.LastLogIndex == _log.LastLogIndex && - requestVote.LastLogTerm == _log.LastLogTerm) + if (requestVote.LastLogIndex == await _log.LastLogIndex() && + requestVote.LastLogTerm == await _log.LastLogTerm()) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); @@ -153,13 +153,13 @@ public void Stop() return (null, false); } - private void ApplyToStateMachine(int commitIndex, int lastApplied, AppendEntries appendEntries) + private async Task ApplyToStateMachine(int commitIndex, int lastApplied, AppendEntries appendEntries) { while (commitIndex > lastApplied) { lastApplied++; - var log = _log.Get(lastApplied); - _fsm.Handle(log); + var log = await _log.Get(lastApplied); + await _fsm.Handle(log); } CurrentState = new CurrentState(CurrentState.Id, appendEntries.Term, diff --git a/src/Rafty/Concensus/States/IState.cs b/src/Rafty/Concensus/States/IState.cs index 78e7913..aa2adeb 100644 --- a/src/Rafty/Concensus/States/IState.cs +++ b/src/Rafty/Concensus/States/IState.cs @@ -2,12 +2,14 @@ namespace Rafty.Concensus { + using System.Threading.Tasks; + public interface IState { CurrentState CurrentState { get; } - AppendEntriesResponse Handle(AppendEntries appendEntries); - RequestVoteResponse Handle(RequestVote requestVote); - Response Accept(T command) where T : ICommand; + Task Handle(AppendEntries appendEntries); + Task Handle(RequestVote requestVote); + Task> Accept(T command) where T : ICommand; void Stop(); } } \ No newline at end of file diff --git a/src/Rafty/Concensus/States/Leader.cs b/src/Rafty/Concensus/States/Leader.cs index 1156166..ab3b8a0 100644 --- a/src/Rafty/Concensus/States/Leader.cs +++ b/src/Rafty/Concensus/States/Leader.cs @@ -56,29 +56,29 @@ public void Stop() _electionTimer.Dispose(); } - public Response Accept(T command) where T : ICommand + public async Task> Accept(T command) where T : ICommand { - var indexOfCommand = AddCommandToLog(command); + var indexOfCommand = await AddCommandToLog(command); var peers = _getPeers(CurrentState); if(No(peers)) { - var log = _log.Get(indexOfCommand); - ApplyToStateMachineAndUpdateCommitIndex(log); + var log = await _log.Get(indexOfCommand); + await ApplyToStateMachineAndUpdateCommitIndex(log); return Ok(command); } - return Replicate(command, indexOfCommand); + return await Replicate(command, indexOfCommand); } - public AppendEntriesResponse Handle(AppendEntries appendEntries) + public async Task Handle(AppendEntries appendEntries) { if (appendEntries.Term > CurrentState.CurrentTerm) { - var response = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); + var response = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); - ApplyToStateMachine(appendEntries, response.commitIndex, response.lastApplied); + await ApplyToStateMachine(appendEntries, response.commitIndex, response.lastApplied); SetLeaderId(appendEntries); @@ -90,7 +90,7 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries) return new AppendEntriesResponse(CurrentState.CurrentTerm, false); } - public RequestVoteResponse Handle(RequestVote requestVote) + public async Task Handle(RequestVote requestVote) { var response = RequestVoteTermIsGreaterThanCurrentTerm(requestVote); @@ -109,9 +109,9 @@ private ConcurrentBag SetUpAppendingEntries() return responses; } - private AppendEntriesResponse GetAppendEntriesResponse(PeerState p, List<(int, LogEntry logEntry)> logsToSend) + private async Task GetAppendEntriesResponse(PeerState p, List<(int, LogEntry logEntry)> logsToSend) { - var appendEntriesResponse = p.Peer.Request(new AppendEntries(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm, logsToSend.Select(x => x.logEntry).ToList(), CurrentState.CommitIndex)); + var appendEntriesResponse = await p.Peer.Request(new AppendEntries(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm(), logsToSend.Select(x => x.logEntry).ToList(), CurrentState.CommitIndex)); return appendEntriesResponse; } @@ -145,7 +145,7 @@ private void UpdateIndexes(PeerState peer, List<(int index, LogEntry logEntry)> } } - private void SendAppendEntries() + private async Task SendAppendEntries() { if(_appendingEntries == true) { @@ -166,25 +166,27 @@ private void SendAppendEntries() { var peersNotInPeerStates = peers.Where(p => !PeerStates.Select(x => x.Peer.Id).Contains(p.Id)).ToList(); - peersNotInPeerStates.ForEach(p => { + peersNotInPeerStates.ForEach(async p => { var matchIndex = new MatchIndex(p, 0); - var nextIndex = new NextIndex(p, _log.LastLogIndex); + var nextIndex = new NextIndex(p, await _log.LastLogIndex()); PeerStates.Add(new PeerState(p, matchIndex, nextIndex)); }); } var appendEntriesResponses = SetUpAppendingEntries(); - Parallel.ForEach(PeerStates, peer => + async Task Do(PeerState peer) { - var logsToSend = GetLogsForPeer(peer.NextIndex); - - var appendEntriesResponse = GetAppendEntriesResponse(peer, logsToSend); + var logsToSend = await GetLogsForPeer(peer.NextIndex); + + var appendEntriesResponse = await GetAppendEntriesResponse(peer, logsToSend); appendEntriesResponses.Add(appendEntriesResponse); - + UpdateIndexes(peer, logsToSend, appendEntriesResponse); - }); + } + + Parallel.ForEach(PeerStates, async peer => await Do(peer)); var response = DoesResponseContainsGreaterTerm(appendEntriesResponses); @@ -197,7 +199,7 @@ private void SendAppendEntries() return; } - UpdateCommitIndex(); + await UpdateCommitIndex(); _appendingEntries = false; } @@ -214,7 +216,7 @@ private void SendAppendEntries() return (false, 0); } - private void UpdateCommitIndex() + private async Task UpdateCommitIndex() { var nextCommitIndex = CurrentState.CommitIndex + 1; var statesIndexOfHighestKnownReplicatedLogs = PeerStates.Select(x => x.MatchIndex.IndexOfHighestKnownReplicatedLog).ToList(); @@ -222,7 +224,7 @@ private void UpdateCommitIndex() var lessThanN = statesIndexOfHighestKnownReplicatedLogs.Where(x => x < nextCommitIndex).ToList(); if (greaterOrEqualToN.Count > lessThanN.Count) { - if (_log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) + if (await _log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, CurrentState.VotedFor, nextCommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); @@ -233,9 +235,9 @@ private void UpdateCommitIndex() private void ResetElectionTimer() { _electionTimer?.Dispose(); - _electionTimer = new Timer(x => + _electionTimer = new Timer(async x => { - SendAppendEntries(); + await SendAppendEntries(); }, null, 0, Convert.ToInt32(_settings.HeartbeatTimeout)); } @@ -244,17 +246,17 @@ private void InitialisePeerStates() { PeerStates = new List(); var peers = _getPeers(CurrentState); - peers.ForEach(p => { + peers.ForEach(async p => { var matchIndex = new MatchIndex(p, 0); - var nextIndex = new NextIndex(p, _log.LastLogIndex); + var nextIndex = new NextIndex(p, await _log.LastLogIndex()); PeerStates.Add(new PeerState(p, matchIndex, nextIndex)); }); } - private int AddCommandToLog(T command) where T : ICommand + private async Task AddCommandToLog(T command) where T : ICommand { var log = new LogEntry(command, command.GetType(), CurrentState.CurrentTerm); - var index = _log.Apply(log); + var index = await _log.Apply(log); return index; } @@ -291,13 +293,13 @@ private void Wait() Thread.Sleep(_settings.HeartbeatTimeout); } - private List<(int index ,LogEntry logEntry)> GetLogsForPeer(NextIndex nextIndex) + private async Task> GetLogsForPeer(NextIndex nextIndex) { - if (_log.Count > 0) + if (await _log.Count() > 0) { - if (_log.LastLogIndex >= nextIndex.NextLogIndexToSendToPeer) + if (await _log.LastLogIndex() >= nextIndex.NextLogIndexToSendToPeer) { - var logs = _log.GetFrom(nextIndex.NextLogIndexToSendToPeer); + var logs = await _log.GetFrom(nextIndex.NextLogIndexToSendToPeer); return logs; } } @@ -318,13 +320,13 @@ private void Wait() return (null, false); } - private void ApplyToStateMachine(AppendEntries appendEntries, int commitIndex, int lastApplied) + private async Task ApplyToStateMachine(AppendEntries appendEntries, int commitIndex, int lastApplied) { while (commitIndex > lastApplied) { lastApplied++; - var log = _log.Get(lastApplied); - _fsm.Handle(log); + var log = await _log.Get(lastApplied); + await _fsm.Handle(log); } CurrentState = new CurrentState(CurrentState.Id, appendEntries.Term, @@ -346,16 +348,16 @@ private bool No(List peers) return false; } - private void ApplyToStateMachineAndUpdateCommitIndex(LogEntry log) + private async Task ApplyToStateMachineAndUpdateCommitIndex(LogEntry log) { var nextCommitIndex = CurrentState.CommitIndex + 1; - if (_log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) + if (await _log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, CurrentState.VotedFor, nextCommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); } - _fsm.Handle(log); + await _fsm.Handle(log); } private OkResponse Ok(T command) @@ -363,10 +365,10 @@ private OkResponse Ok(T command) return new OkResponse(command); } - private ErrorResponse UnableDueToTimeout(T command, int indexOfCommand) + private async Task> UnableDueToTimeout(T command, int indexOfCommand) { DecrementIndexesOfAnyPeersCommandReplicatedTo(indexOfCommand); - _log.Remove(indexOfCommand); + await _log.Remove(indexOfCommand); _appendingEntries = false; return new ErrorResponse("Unable to replicate command to peers due to timeout.", command); } @@ -382,7 +384,7 @@ private void DecrementIndexesOfAnyPeersCommandReplicatedTo(int indexOfCommand) }); } - private Response Replicate(T command, int indexOfCommand) + private async Task> Replicate(T command, int indexOfCommand) { SetUpReplication(); @@ -390,7 +392,7 @@ private Response Replicate(T command, int indexOfCommand) { if(ReplicationTimeout()) { - return UnableDueToTimeout(command, indexOfCommand); + return await UnableDueToTimeout(command, indexOfCommand); } var replicated = 0; @@ -404,8 +406,8 @@ private Response Replicate(T command, int indexOfCommand) if (ReplicatedToMajority(replicated)) { - var log = _log.Get(indexOfCommand); - _fsm.Handle(log); + var log = await _log.Get(indexOfCommand); + await _fsm.Handle(log); FinishWaitingForCommandToReplicate(); break; } diff --git a/src/Rafty/Concensus/States/Rules.cs b/src/Rafty/Concensus/States/Rules.cs index caae8ea..be2c390 100644 --- a/src/Rafty/Concensus/States/Rules.cs +++ b/src/Rafty/Concensus/States/Rules.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using Rafty.Log; namespace Rafty.Concensus.States @@ -6,10 +7,10 @@ namespace Rafty.Concensus.States public interface IRules { (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) AppendEntriesTermIsLessThanCurrentTerm(AppendEntries appendEntries, CurrentState currentState); - (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState); - void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log); - void ApplyEntriesToLog(AppendEntries appendEntries, ILog log); - (int commitIndex, int lastApplied) CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState); + Task<(AppendEntriesResponse appendEntriesResponse, bool shouldReturn)> LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState); + Task DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log); + Task ApplyEntriesToLog(AppendEntries appendEntries, ILog log); + Task<(int commitIndex, int lastApplied)> CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState); (RequestVoteResponse requestVoteResponse, bool shouldReturn) RequestVoteTermIsLessThanCurrentTerm(RequestVote requestVote, CurrentState currentState); (RequestVoteResponse requestVoteResponse, bool shouldReturn) VotedForIsNotThisOrNobody(RequestVote requestVote, CurrentState currentState); } @@ -39,34 +40,34 @@ public class Rules : IRules } // todo - inject as function into candidate and follower as logic is the same... - public (int commitIndex, int lastApplied) CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState) + public async Task<(int commitIndex, int lastApplied)> CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState) { var commitIndex = currentState.CommitIndex; var lastApplied = currentState.LastApplied; if (appendEntries.LeaderCommitIndex > currentState.CommitIndex) { - var lastNewEntry = log.LastLogIndex; + var lastNewEntry = await log.LastLogIndex(); commitIndex = System.Math.Min(appendEntries.LeaderCommitIndex, lastNewEntry); } return (commitIndex, lastApplied); } // todo - inject as function into candidate and follower as logic is the same... - public void ApplyEntriesToLog(AppendEntries appendEntries, ILog log) + public async Task ApplyEntriesToLog(AppendEntries appendEntries, ILog log) { foreach (var entry in appendEntries.Entries) { - log.Apply(entry); + await log.Apply(entry); } } // todo - inject as function into candidate and follower as logic is the same... - public void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log) + public async Task DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log) { var count = 1; foreach (var newLog in appendEntries.Entries) { - log.DeleteConflictsFromThisLog(appendEntries.PreviousLogIndex + 1, newLog); + await log.DeleteConflictsFromThisLog(appendEntries.PreviousLogIndex + 1, newLog); count++; } } @@ -83,9 +84,9 @@ public void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log) } // todo - inject as function into candidate and follower as logic is the same... - public (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState) + public async Task<(AppendEntriesResponse appendEntriesResponse, bool shouldReturn)> LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState) { - var termAtPreviousLogIndex = log.GetTermAtIndex(appendEntries.PreviousLogIndex); + var termAtPreviousLogIndex = await log.GetTermAtIndex(appendEntries.PreviousLogIndex); if (termAtPreviousLogIndex > 0 && termAtPreviousLogIndex != appendEntries.PreviousLogTerm) { return (new AppendEntriesResponse(currentState.CurrentTerm, false), true); diff --git a/src/Rafty/FiniteStateMachine/IFiniteStateMachine.cs b/src/Rafty/FiniteStateMachine/IFiniteStateMachine.cs index 4b86e34..9e0acbf 100644 --- a/src/Rafty/FiniteStateMachine/IFiniteStateMachine.cs +++ b/src/Rafty/FiniteStateMachine/IFiniteStateMachine.cs @@ -9,6 +9,6 @@ public interface IFiniteStateMachine /// /// This will apply the given log to the state machine. /// - void Handle(LogEntry log); + Task Handle(LogEntry log); } } \ No newline at end of file diff --git a/src/Rafty/FiniteStateMachine/InMemoryStateMachine.cs b/src/Rafty/FiniteStateMachine/InMemoryStateMachine.cs index e4ec6d3..700b438 100644 --- a/src/Rafty/FiniteStateMachine/InMemoryStateMachine.cs +++ b/src/Rafty/FiniteStateMachine/InMemoryStateMachine.cs @@ -1,3 +1,4 @@ +using System.Threading.Tasks; using Rafty.Log; namespace Rafty.FiniteStateMachine @@ -6,9 +7,10 @@ public class InMemoryStateMachine : IFiniteStateMachine { public int HandledLogEntries {get;private set;} - public void Handle(LogEntry log) + public Task Handle(LogEntry log) { HandledLogEntries++; + return Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/Rafty/Infrastructure/Waiter.cs b/src/Rafty/Infrastructure/Waiter.cs index 8a7f52e..a46fae0 100644 --- a/src/Rafty/Infrastructure/Waiter.cs +++ b/src/Rafty/Infrastructure/Waiter.cs @@ -7,6 +7,8 @@ namespace Rafty.Infrastructure { + using System.Threading.Tasks; + public class Wait { public static Waiter WaitFor(int milliSeconds) @@ -40,6 +42,22 @@ public bool Until(Func condition) return passed; } + public async Task Until(Func> condition) + { + var stopwatch = Stopwatch.StartNew(); + var passed = false; + while (stopwatch.ElapsedMilliseconds < _milliSeconds) + { + if (await condition.Invoke()) + { + passed = true; + break; + } + } + + return passed; + } + public bool Until(Func condition) { var stopwatch = Stopwatch.StartNew(); diff --git a/src/Rafty/Log/ILog.cs b/src/Rafty/Log/ILog.cs index dd80810..8633097 100644 --- a/src/Rafty/Log/ILog.cs +++ b/src/Rafty/Log/ILog.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; namespace Rafty.Log { @@ -8,39 +9,39 @@ public interface ILog /// /// This will apply a log entry and return its index /// - int Apply(LogEntry log); + Task Apply(LogEntry log); /// /// This will return the log entry at the index passed in /// - LogEntry Get(int index); + Task Get(int index); /// /// This will return all the log entries from a certain point based on index including the first match on the index passed in /// - List<(int index, LogEntry logEntry)> GetFrom(int index); + Task> GetFrom(int index); /// /// This will return the last known log index or 1 /// - int LastLogIndex {get;} + Task LastLogIndex(); /// /// This will return the last know log term or 0 /// - long LastLogTerm {get;} + Task LastLogTerm(); /// /// This will get the term at the index passed in /// - long GetTermAtIndex(int index); + Task GetTermAtIndex(int index); /// /// This will delete any conflicts from the log, if the log entry passed in doesnt match the log entry //in the log for the given index it will also delete any further logs /// - void DeleteConflictsFromThisLog(int index, LogEntry logEntry); + Task DeleteConflictsFromThisLog(int index, LogEntry logEntry); /// /// This returns a count of the logs /// - int Count { get; } + Task Count(); /// /// This removes the command at the index passed in. /// - void Remove(int indexOfCommand); + Task Remove(int indexOfCommand); } } \ No newline at end of file diff --git a/src/Rafty/Log/InMemoryLog.cs b/src/Rafty/Log/InMemoryLog.cs index 55cf39a..31c1815 100644 --- a/src/Rafty/Log/InMemoryLog.cs +++ b/src/Rafty/Log/InMemoryLog.cs @@ -4,6 +4,7 @@ namespace Rafty.Log { using System; using System.Collections.Generic; + using System.Threading.Tasks; using Rafty.Infrastructure; public class InMemoryLog : ILog @@ -17,7 +18,7 @@ public InMemoryLog() public Dictionary ExposedForTesting => _log; - public List<(int index, LogEntry logEntry)> GetFrom(int index) + public Task> GetFrom(int index) { var logsToReturn = new List<(int, LogEntry)>(); @@ -29,64 +30,58 @@ public InMemoryLog() } } - return logsToReturn; + return Task.FromResult(logsToReturn); } - public int LastLogIndex + public Task LastLogIndex() { - get - { var lastLog = _log.LastOrDefault(); if(lastLog.Value != null) { - return lastLog.Key; + return Task.FromResult(lastLog.Key); } - return 1; - } + return Task.FromResult(1); } - public long LastLogTerm + public Task LastLogTerm() { - get - { - var lastLog = _log.LastOrDefault(); + var lastLog = _log.LastOrDefault(); if(lastLog.Value != null) { - return lastLog.Value.Term; + return Task.FromResult(lastLog.Value.Term); } - return 0; - } + return Task.FromResult((long)0); } - public int Apply(LogEntry logEntry) + public Task Apply(LogEntry logEntry) { if(_log.Count <= 0) { _log.Add(1, logEntry); - return 1; + return Task.FromResult(1); } else { var nextIndex = _log.Max(x => x.Key) + 1; _log.Add(nextIndex, logEntry); - return nextIndex; + return Task.FromResult(nextIndex); } } - public long GetTermAtIndex(int index) + public Task GetTermAtIndex(int index) { if(_log.Count == 0) { - return 0; + return Task.FromResult((long)0); } if(index > _log.Count) { - return 0; + return Task.FromResult((long)0); } if (index <= 0) @@ -94,14 +89,14 @@ public long GetTermAtIndex(int index) throw new Exception("Log starts at 1..."); } - return _log[index].Term; + return Task.FromResult(_log[index].Term); } - public void DeleteConflictsFromThisLog(int logIndex, LogEntry logEntry) + public Task DeleteConflictsFromThisLog(int logIndex, LogEntry logEntry) { if(logIndex > 1 && logIndex > _log.Count -1) { - return; + return Task.CompletedTask;; } for (int i = logIndex; i <= _log.Max(x => x.Key); i++) @@ -114,20 +109,24 @@ public void DeleteConflictsFromThisLog(int logIndex, LogEntry logEntry) break; } } + + return Task.CompletedTask; } - private void RemoveRange(int indexToRemove, int toRemove) + private Task RemoveRange(int indexToRemove, int toRemove) { while(_log.ContainsKey(indexToRemove)) { _log.Remove(indexToRemove); indexToRemove++; } + + return Task.CompletedTask; } - public int Count => _log.Count; + public Task Count() => Task.FromResult(_log.Count); - public LogEntry Get(int index) + public Task Get(int index) { if (index <= 0) { @@ -136,15 +135,16 @@ public LogEntry Get(int index) if(_log.Count >= index) { - return _log[index]; + return Task.FromResult(_log[index]); } throw new Exception("Nothing in log.."); } - public void Remove(int indexOfCommand) + public Task Remove(int indexOfCommand) { _log.Remove(indexOfCommand); + return Task.CompletedTask; } } } \ No newline at end of file diff --git a/test/Rafty.AcceptanceTests/Tests.cs b/test/Rafty.AcceptanceTests/Tests.cs index 6d28461..96786b4 100644 --- a/test/Rafty.AcceptanceTests/Tests.cs +++ b/test/Rafty.AcceptanceTests/Tests.cs @@ -16,6 +16,8 @@ [assembly: CollectionBehavior(DisableTestParallelization = true)] namespace Rafty.AcceptanceTests { + using System.Threading.Tasks; + public class Tests { private readonly ConcurrentDictionary _servers; @@ -50,20 +52,20 @@ public void ShouldRunInSoloModeThenAddNewServersThatBecomeFollowers() } [Fact] - public void ShouldRunInSoloModeAcceptCommandThenAddNewServersThatBecomeFollowersAndCommandsWorkForAllServers() + public async Task ShouldRunInSoloModeAcceptCommandThenAddNewServersThatBecomeFollowersAndCommandsWorkForAllServers() { CreateServers(1); AssignNodesToPeers(); StartNodes(); AssertLeaderElected(0); - SendCommandToLeader(); + await SendCommandToLeader(); AddNewServers(4); AssertLeaderElected(4); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); } [Fact] - public void ShouldRunInSoloModeThenAddNewServersThatBecomeFollowersAndCommandsWorkForAllServers() + public async Task ShouldRunInSoloModeThenAddNewServersThatBecomeFollowersAndCommandsWorkForAllServers() { CreateServers(1); AssignNodesToPeers(); @@ -71,8 +73,8 @@ public void ShouldRunInSoloModeThenAddNewServersThatBecomeFollowersAndCommandsWo AssertLeaderElected(0); AddNewServers(4); AssertLeaderElected(4); - SendCommandToLeader(); - AssertCommandAccepted(1, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(1, 4); } [Fact] @@ -116,59 +118,59 @@ public void ShouldAllowPreviousLeaderBackIntoTheCluster() } [Fact] - public void LeaderShouldAcceptCommandThenPersistToFollowersAndApplyToStateMachine() + public async Task LeaderShouldAcceptCommandThenPersistToFollowersAndApplyToStateMachine() { CreateServers(5); AssignNodesToPeers(); StartNodes(); AssertLeaderElected(4); - SendCommandToLeader(); - AssertCommandAccepted(1, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(1, 4); } [Fact] - public void FollowerShouldForwardCommandToLeaderThenPersistToFollowersAndApplyToStateMachine() + public async Task FollowerShouldForwardCommandToLeaderThenPersistToFollowersAndApplyToStateMachine() { CreateServers(5); AssignNodesToPeers(); StartNodes(); AssertLeaderElected(4); - SendCommandToFollower(); - AssertCommandAccepted(1, 4); + await SendCommandToFollower(); + await AssertCommandAccepted(1, 4); } [Fact] - public void LeaderShouldAcceptManyCommandsThenPersistToFollowersAndApplyToStateMachine() + public async Task LeaderShouldAcceptManyCommandsThenPersistToFollowersAndApplyToStateMachine() { CreateServers(5); AssignNodesToPeers(); StartNodes(); AssertLeaderElected(4); - SendCommandToLeader(); - AssertCommandAccepted(1, 4); - SendCommandToLeader(); - AssertCommandAccepted(2, 4); - SendCommandToLeader(); - AssertCommandAccepted(3, 4); - SendCommandToLeader(); - AssertCommandAccepted(4, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(1, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(2, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(3, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(4, 4); } [Fact] - public void ShouldCatchUpIfNodeDies() + public async Task ShouldCatchUpIfNodeDies() { CreateServers(5); AssignNodesToPeers(); StartNodes(); KillTheLeader(); AssertLeaderElected(3); - SendCommandToLeader(); - AssertCommandAccepted(1, 3); + await SendCommandToLeader(); + await AssertCommandAccepted(1, 3); BringPreviousLeaderBackToLife(); AssertLeaderElected(4); - AssertCommandAccepted(1, 4); - SendCommandToLeader(); - AssertCommandAccepted(2, 4); + await AssertCommandAccepted(1, 4); + await SendCommandToLeader(); + await AssertCommandAccepted(2, 4); } private void AddNewServers(int count) @@ -190,20 +192,20 @@ private void AddNewServers(int count) } } - private void SendCommandToLeader() + private async Task SendCommandToLeader() { var leaderServer = GetLeader(); var command = new FakeCommand(); - leaderServer.Value.Node.Accept(command); + await leaderServer.Value.Node.Accept(command); } - private void SendCommandToFollower() + private async Task SendCommandToFollower() { - bool SendCommand() + async Task SendCommand() { var followerServer = _servers.First(x => x.Value.Node.State is Follower); var command = new FakeCommand(); - var response = followerServer.Value.Node.Accept(command); + var response = await followerServer.Value.Node.Accept(command); if (response is ErrorResponse) { return false; @@ -212,23 +214,23 @@ bool SendCommand() return true; } - var sentCommand = WaitFor(25000).Until(SendCommand); + var sentCommand = await WaitFor(25000).Until(async () => await SendCommand()); sentCommand.ShouldBeTrue(); } - private void AssertCommandAccepted(int expectedReplicatedCount, int expectedFollowers) + private async Task AssertCommandAccepted(int expectedReplicatedCount, int expectedFollowers) { - bool IsReplicatedToLeader(KeyValuePair server) + async Task IsReplicatedToLeader(KeyValuePair server) { - return server.Value.Log.Count == expectedReplicatedCount && + return await server.Value.Log.Count() == expectedReplicatedCount && server.Value.Fsm.HandledLogEntries == expectedReplicatedCount; } var leaderServer = GetLeader(); - var appliedToLeaderFsm = WaitFor(25000).Until(() => IsReplicatedToLeader(leaderServer)); + var appliedToLeaderFsm = await WaitFor(25000).Until(() => IsReplicatedToLeader(leaderServer)); appliedToLeaderFsm.ShouldBeTrue(); - bool IsReplicatedToFollowers() + async Task IsReplicatedToFollowers() { var followers = _servers .Select(x => x.Value) @@ -242,7 +244,7 @@ bool IsReplicatedToFollowers() foreach(var follower in followers) { - if(follower.Log.Count != expectedReplicatedCount) + if(await follower.Log.Count() != expectedReplicatedCount) { return false; } @@ -256,7 +258,7 @@ bool IsReplicatedToFollowers() return true; } - var appliedToFollowersFsm = WaitFor(25000).Until(IsReplicatedToFollowers); + var appliedToFollowersFsm = await WaitFor(25000).Until(() => IsReplicatedToFollowers()); appliedToFollowersFsm.ShouldBeTrue(); } @@ -280,7 +282,7 @@ bool LeaderElected() return leader.Value != null; } - var leaderElectedAndCommandReceived = WaitFor(20000).Until(LeaderElected); + var leaderElectedAndCommandReceived = WaitFor(20000).Until(() => LeaderElected()); leaderElectedAndCommandReceived.ShouldBeTrue(); var leaderServer = GetLeader(); @@ -323,7 +325,7 @@ bool LeaderElected() return false; } - var leaderElected = WaitFor(20000).Until(LeaderElected); + var leaderElected = WaitFor(20000).Until(() => LeaderElected()); leaderElected.ShouldBeTrue(); } diff --git a/test/Rafty.IntegrationTests/FileFsm.cs b/test/Rafty.IntegrationTests/FileFsm.cs index 30fd412..d442ffd 100644 --- a/test/Rafty.IntegrationTests/FileFsm.cs +++ b/test/Rafty.IntegrationTests/FileFsm.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Threading.Tasks; using Newtonsoft.Json; using Rafty.FiniteStateMachine; using Rafty.Infrastructure; @@ -9,18 +10,19 @@ namespace Rafty.IntegrationTests { public class FileFsm : IFiniteStateMachine { - private string _id; + private readonly string _id; + public FileFsm(NodeId nodeId) { _id = nodeId.Id; } - public void Handle(LogEntry log) + public async Task Handle(LogEntry log) { try { var json = JsonConvert.SerializeObject(log.CommandData); - File.AppendAllText(_id.Replace("/","").Replace(":","").ToString(), json); + await File.AppendAllTextAsync(_id.Replace("/","").Replace(":","").ToString(), json); } catch(Exception exception) { diff --git a/test/Rafty.IntegrationTests/HttpPeer.cs b/test/Rafty.IntegrationTests/HttpPeer.cs index 3f25082..5401be7 100644 --- a/test/Rafty.IntegrationTests/HttpPeer.cs +++ b/test/Rafty.IntegrationTests/HttpPeer.cs @@ -6,6 +6,8 @@ namespace Rafty.IntegrationTests { + using System.Threading.Tasks; + public class HttpPeer : IPeer { private string _hostAndPort; @@ -24,14 +26,14 @@ public HttpPeer(string hostAndPort, HttpClient httpClient) public string Id {get; private set;} - public RequestVoteResponse Request(RequestVote requestVote) + public async Task Request(RequestVote requestVote) { var json = JsonConvert.SerializeObject(requestVote, _jsonSerializerSettings); var content = new StringContent(json); - var response = _httpClient.PostAsync($"{_hostAndPort}/requestvote", content).GetAwaiter().GetResult(); + var response = await _httpClient.PostAsync($"{_hostAndPort}/requestvote", content); if(response.IsSuccessStatusCode) { - return JsonConvert.DeserializeObject(response.Content.ReadAsStringAsync().GetAwaiter().GetResult()); + return JsonConvert.DeserializeObject(await response.Content.ReadAsStringAsync()); } else { @@ -39,16 +41,16 @@ public RequestVoteResponse Request(RequestVote requestVote) } } - public AppendEntriesResponse Request(AppendEntries appendEntries) + public async Task Request(AppendEntries appendEntries) { try { var json = JsonConvert.SerializeObject(appendEntries, _jsonSerializerSettings); var content = new StringContent(json); - var response = _httpClient.PostAsync($"{_hostAndPort}/appendEntries", content).GetAwaiter().GetResult(); + var response = await _httpClient.PostAsync($"{_hostAndPort}/appendEntries", content); if(response.IsSuccessStatusCode) { - return JsonConvert.DeserializeObject(response.Content.ReadAsStringAsync().GetAwaiter().GetResult()); + return JsonConvert.DeserializeObject(await response.Content.ReadAsStringAsync()); } else { @@ -62,18 +64,18 @@ public AppendEntriesResponse Request(AppendEntries appendEntries) } } - public Response Request(T command) where T : ICommand + public async Task> Request(T command) where T : ICommand { var json = JsonConvert.SerializeObject(command, _jsonSerializerSettings); var content = new StringContent(json); - var response = _httpClient.PostAsync($"{_hostAndPort}/command", content).GetAwaiter().GetResult(); + var response = await _httpClient.PostAsync($"{_hostAndPort}/command", content); if(response.IsSuccessStatusCode) { - return JsonConvert.DeserializeObject>(response.Content.ReadAsStringAsync().GetAwaiter().GetResult()); + return JsonConvert.DeserializeObject>(await response.Content.ReadAsStringAsync()); } else { - return new ErrorResponse(response.Content.ReadAsStringAsync().GetAwaiter().GetResult(), command); + return new ErrorResponse(await response.Content.ReadAsStringAsync(), command); } } } diff --git a/test/Rafty.IntegrationTests/SqlLiteLog.cs b/test/Rafty.IntegrationTests/SqlLiteLog.cs index 3d1d953..4e6eab3 100644 --- a/test/Rafty.IntegrationTests/SqlLiteLog.cs +++ b/test/Rafty.IntegrationTests/SqlLiteLog.cs @@ -5,24 +5,24 @@ using System; using Rafty.Infrastructure; using System.Collections.Generic; +using System.Threading.Tasks; +using System.Threading; namespace Rafty.IntegrationTests { public class SqlLiteLog : ILog { private string _path; - private readonly object _lock = new object(); + private readonly SemaphoreSlim _sempaphore = new SemaphoreSlim(1,1); public SqlLiteLog(NodeId nodeId) { _path = $"{nodeId.Id.Replace("/","").Replace(":","").ToString()}.db"; if(!File.Exists(_path)) { - lock(_lock) - { - FileStream fs = File.Create(_path); - fs.Dispose(); - } + FileStream fs = File.Create(_path); + fs.Dispose(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { connection.Open(); @@ -38,241 +38,222 @@ data text not null } } - public int LastLogIndex + public async Task LastLogIndex() { - get + _sempaphore.Wait(); + var result = 1; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - lock(_lock) + connection.Open(); + var sql = @"select id from logs order by id desc limit 1"; + using(var command = new SqliteCommand(sql, connection)) { - var result = 1; - using(var connection = new SqliteConnection($"Data Source={_path};")) + var index = Convert.ToInt32(await command.ExecuteScalarAsync()); + if(index > result) { - connection.Open(); - var sql = @"select id from logs order by id desc limit 1"; - using(var command = new SqliteCommand(sql, connection)) - { - var index = Convert.ToInt32(command.ExecuteScalar()); - if(index > result) - { - result = index; - } - } + result = index; } - return result; } } + + _sempaphore.Release(); + return result; } - public long LastLogTerm + public async Task LastLogTerm() { - get + _sempaphore.Wait(); + long result = 0; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - lock(_lock) + connection.Open(); + var sql = @"select data from logs order by id desc limit 1"; + using(var command = new SqliteCommand(sql, connection)) { - long result = 0; - using(var connection = new SqliteConnection($"Data Source={_path};")) + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if(log != null && log.Term > result) { - connection.Open(); - var sql = @"select data from logs order by id desc limit 1"; - using(var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if(log != null && log.Term > result) - { - result = log.Term; - } - } + result = log.Term; } - return result; } } + _sempaphore.Release(); + return result; } - public int Count + public async Task Count() { - get + _sempaphore.Wait(); + var result = 0; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - lock(_lock) + connection.Open(); + var sql = @"select count(id) from logs"; + using(var command = new SqliteCommand(sql, connection)) { - var result = 0; - using(var connection = new SqliteConnection($"Data Source={_path};")) + var index = Convert.ToInt32(await command.ExecuteScalarAsync()); + if(index > result) { - connection.Open(); - var sql = @"select count(id) from logs"; - using(var command = new SqliteCommand(sql, connection)) - { - var index = Convert.ToInt32(command.ExecuteScalar()); - if(index > result) - { - result = index; - } - } + result = index; } - return result; } } + _sempaphore.Release(); + return result; } - public int Apply(LogEntry log) + public async Task Apply(LogEntry log) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var data = JsonConvert.SerializeObject(log, jsonSerializerSettings); + //todo - sql injection dont copy this.. + var sql = $"insert into logs (data) values ('{data}')"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var data = JsonConvert.SerializeObject(log, jsonSerializerSettings); - //todo - sql injection dont copy this.. - var sql = $"insert into logs (data) values ('{data}')"; - using(var command = new SqliteCommand(sql, connection)) - { - var result = command.ExecuteNonQuery(); - } - - sql = "select last_insert_rowid()"; - using(var command = new SqliteCommand(sql, connection)) - { - var result = command.ExecuteScalar(); - return Convert.ToInt32(result); - } + var result = await command.ExecuteNonQueryAsync(); } + + sql = "select last_insert_rowid()"; + using(var command = new SqliteCommand(sql, connection)) + { + var result = await command.ExecuteScalarAsync(); + _sempaphore.Release(); + return Convert.ToInt32(result); + } } } - public void DeleteConflictsFromThisLog(int index, LogEntry logEntry) + public async Task DeleteConflictsFromThisLog(int index, LogEntry logEntry) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index};"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index};"; - using(var command = new SqliteCommand(sql, connection)) + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if(logEntry != null && log != null && logEntry.Term != log.Term) { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if(logEntry != null && log != null && logEntry.Term != log.Term) + //todo - sql injection dont copy this.. + var deleteSql = $"delete from logs where id >= {index};"; + using(var deleteCommand = new SqliteCommand(deleteSql, connection)) { - //todo - sql injection dont copy this.. - var deleteSql = $"delete from logs where id >= {index};"; - using(var deleteCommand = new SqliteCommand(deleteSql, connection)) - { - var result = deleteCommand.ExecuteNonQuery(); - } + var result = await deleteCommand.ExecuteNonQueryAsync(); } } } } + _sempaphore.Release(); } - public LogEntry Get(int index) + public async Task Get(int index) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index}"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index}"; - using(var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - return log; - } + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + _sempaphore.Release(); + return log; } } } - public System.Collections.Generic.List<(int index, LogEntry logEntry)> GetFrom(int index) + public async Task> GetFrom(int index) { - lock(_lock) - { - var logsToReturn = new List<(int, LogEntry)>(); + _sempaphore.Wait(); + var logsToReturn = new List<(int, LogEntry)>(); - using(var connection = new SqliteConnection($"Data Source={_path};")) + using(var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select id, data from logs where id >= {index}"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select id, data from logs where id >= {index}"; - using(var command = new SqliteCommand(sql, connection)) + using(var reader = await command.ExecuteReaderAsync()) { - using(var reader = command.ExecuteReader()) + while(reader.Read()) { - while(reader.Read()) - { - var id = Convert.ToInt32(reader[0]); - var data = (string)reader[1]; - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - logsToReturn.Add((id, log)); + var id = Convert.ToInt32(reader[0]); + var data = (string)reader[1]; + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + logsToReturn.Add((id, log)); - } } } } - - return logsToReturn; + _sempaphore.Release(); + return logsToReturn; } - } - public long GetTermAtIndex(int index) + public async Task GetTermAtIndex(int index) { - lock(_lock) + _sempaphore.Wait(); + long result = 0; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - long result = 0; - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index}"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index}"; - using(var command = new SqliteCommand(sql, connection)) + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if(log != null && log.Term > result) { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if(log != null && log.Term > result) - { - result = log.Term; - } + result = log.Term; } } - return result; } + _sempaphore.Release(); + return result; } - public void Remove(int indexOfCommand) + public async Task Remove(int indexOfCommand) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var deleteSql = $"delete from logs where id >= {indexOfCommand};"; + using(var deleteCommand = new SqliteCommand(deleteSql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var deleteSql = $"delete from logs where id >= {indexOfCommand};"; - using(var deleteCommand = new SqliteCommand(deleteSql, connection)) - { - var result = deleteCommand.ExecuteNonQuery(); - } + var result = await deleteCommand.ExecuteNonQueryAsync(); } } + _sempaphore.Release(); } } } \ No newline at end of file diff --git a/test/Rafty.IntegrationTests/SqlLiteLogTests.cs b/test/Rafty.IntegrationTests/SqlLiteLogTests.cs index f0e7e64..e9b16b7 100644 --- a/test/Rafty.IntegrationTests/SqlLiteLogTests.cs +++ b/test/Rafty.IntegrationTests/SqlLiteLogTests.cs @@ -2,6 +2,7 @@ namespace Rafty.UnitTests { using System; using System.IO; + using System.Threading.Tasks; using Log; using Rafty.Infrastructure; using Rafty.IntegrationTests; @@ -23,98 +24,98 @@ public SqlLiteLogTests() public void ShouldInitialiseCorrectly() { var path = Guid.NewGuid().ToString(); - _log.LastLogIndex.ShouldBe(1); - _log.LastLogTerm.ShouldBe(0); + _log.LastLogIndex().Result.ShouldBe(1); + _log.LastLogTerm().Result.ShouldBe(0); } [Fact] - public void ShouldApplyLog() + public async Task ShouldApplyLog() { - var index = _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + var index = await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); index.ShouldBe(1); } [Fact] - public void ShouldSetLastLogIndex() + public async Task ShouldSetLastLogIndex() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.LastLogIndex.ShouldBe(2); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + _log.LastLogIndex().Result.ShouldBe(2); } [Fact] - public void ShouldSetLastLogTerm() + public async Task ShouldSetLastLogTerm() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.LastLogTerm.ShouldBe(2); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.LastLogTerm().Result.ShouldBe(2); } [Fact] - public void ShouldGetTermAtIndex() + public async Task ShouldGetTermAtIndex() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.GetTermAtIndex(1).ShouldBe(1); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + _log.GetTermAtIndex(1).Result.ShouldBe(1); } [Fact] - public void ShouldDeleteConflict() + public async Task ShouldDeleteConflict() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.Count.ShouldBe(0); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.Count().Result.ShouldBe(0); } [Fact] - public void ShouldNotDeleteConflict() + public async Task ShouldNotDeleteConflict() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Count.ShouldBe(1); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); + _log.Count().Result.ShouldBe(1); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogs() + public async Task ShouldDeleteConflictAndSubsequentLogs() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.Count.ShouldBe(0); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.Count().Result.ShouldBe(0); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogsFromMidPoint() + public async Task ShouldDeleteConflictAndSubsequentLogsFromMidPoint() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.Count.ShouldBe(3); - _log.Get(1).Term.ShouldBe(1); - _log.Get(2).Term.ShouldBe(1); - _log.Get(3).Term.ShouldBe(1); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.Count().Result.ShouldBe(3); + _log.Get(1).Result.Term.ShouldBe(1); + _log.Get(2).Result.Term.ShouldBe(1); + _log.Get(3).Result.Term.ShouldBe(1); } [Fact] - public void ShouldGetFrom() + public async Task ShouldGetFrom() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - var logs = _log.GetFrom(3); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + var logs = await _log.GetFrom(3); logs.Count.ShouldBe(3); } [Fact] - public void ShouldRemoveFromLog() + public async Task ShouldRemoveFromLog() { - var index = _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Remove(index); - _log.Count.ShouldBe(0); + var index = await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Remove(index); + _log.Count().Result.ShouldBe(0); } public void Dispose() { diff --git a/test/Rafty.IntegrationTests/Startup.cs b/test/Rafty.IntegrationTests/Startup.cs index e0bda94..03a9cdf 100644 --- a/test/Rafty.IntegrationTests/Startup.cs +++ b/test/Rafty.IntegrationTests/Startup.cs @@ -70,7 +70,7 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerF var content = reader.ReadToEnd(); var appendEntries = JsonConvert.DeserializeObject(content, jsonSerializerSettings); logger.LogInformation(new EventId(1), null, $"{baseSchemeUrlAndPort}/appendentries called, my state is {n.State.GetType().FullName}"); - var appendEntriesResponse = n.Handle(appendEntries); + var appendEntriesResponse = await n.Handle(appendEntries); var json = JsonConvert.SerializeObject(appendEntriesResponse); await context.Response.WriteAsync(json); reader.Dispose(); @@ -82,7 +82,7 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerF var reader = new StreamReader(context.Request.Body); var requestVote = JsonConvert.DeserializeObject(reader.ReadToEnd(), jsonSerializerSettings); logger.LogInformation(new EventId(2), null, $"{baseSchemeUrlAndPort}/requestvote called, my state is {n.State.GetType().FullName}"); - var requestVoteResponse = n.Handle(requestVote); + var requestVoteResponse = await n.Handle(requestVote); var json = JsonConvert.SerializeObject(requestVoteResponse); await context.Response.WriteAsync(json); reader.Dispose(); @@ -94,7 +94,7 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerF var reader = new StreamReader(context.Request.Body); var command = JsonConvert.DeserializeObject(reader.ReadToEnd(), jsonSerializerSettings); logger.LogInformation(new EventId(3), null, $"{baseSchemeUrlAndPort}/command called, my state is {n.State.GetType().FullName}"); - var commandResponse = n.Accept(command); + var commandResponse = await n.Accept(command); var json = JsonConvert.SerializeObject(commandResponse); await context.Response.WriteAsync(json); reader.Dispose(); diff --git a/test/Rafty.IntegrationTests/Tests.cs b/test/Rafty.IntegrationTests/Tests.cs index f214861..a138693 100644 --- a/test/Rafty.IntegrationTests/Tests.cs +++ b/test/Rafty.IntegrationTests/Tests.cs @@ -15,6 +15,8 @@ namespace Rafty.IntegrationTests { + using System.Threading.Tasks; + public class Tests : IDisposable { private readonly List _builders; @@ -30,12 +32,12 @@ public Tests() } [Fact] - public void ShouldPersistCommandToFiveServers() + public async Task ShouldPersistCommandToFiveServers() { var command = new FakeCommand("WHATS UP DOC?"); - GivenFiveServersAreRunning(); - WhenISendACommandIntoTheCluster(command); - ThenTheCommandIsReplicatedToAllStateMachines(command); + await GivenFiveServersAreRunning(); + await WhenISendACommandIntoTheCluster(command); + await ThenTheCommandIsReplicatedToAllStateMachines(command); } private void GivenAServerIsRunning(string url) @@ -58,9 +60,9 @@ private void GivenAServerIsRunning(string url) _builders.Add(builder); } - private void GivenFiveServersAreRunning() + private async Task GivenFiveServersAreRunning() { - var bytes = File.ReadAllText("peers.json"); + var bytes = await File.ReadAllTextAsync("peers.json"); _peers = JsonConvert.DeserializeObject(bytes); foreach (var peer in _peers.Peers) @@ -71,9 +73,9 @@ private void GivenFiveServersAreRunning() } } - private void WhenISendACommandIntoTheCluster(FakeCommand command) + private async Task WhenISendACommandIntoTheCluster(FakeCommand command) { - bool SendCommand() + async Task SendCommand() { try { @@ -82,9 +84,9 @@ bool SendCommand() var httpContent = new StringContent(json); using (var httpClient = new HttpClient()) { - var response = httpClient.PostAsync($"{p.HostAndPort}/command", httpContent).GetAwaiter().GetResult(); + var response = await httpClient.PostAsync($"{p.HostAndPort}/command", httpContent); response.EnsureSuccessStatusCode(); - var content = response.Content.ReadAsStringAsync().GetAwaiter().GetResult(); + var content = await response.Content.ReadAsStringAsync(); var error = JsonConvert.DeserializeObject>(content); if (!string.IsNullOrEmpty(error.Error)) { @@ -101,28 +103,35 @@ bool SendCommand() } } - var leaderElectedAndCommandReceived = WaitFor(20000).Until(SendCommand); + var leaderElectedAndCommandReceived = await WaitFor(20000).Until(async () => await SendCommand()); leaderElectedAndCommandReceived.ShouldBeTrue(); } - private void ThenTheCommandIsReplicatedToAllStateMachines(FakeCommand command) + private async Task ThenTheCommandIsReplicatedToAllStateMachines(FakeCommand command) { - bool CommandCalledOnAllStateMachines() + async Task CommandCalledOnAllStateMachines() { - var passed = 0; - foreach (var peer in _peers.Peers) + try { - var fsmData = File.ReadAllText(peer.HostAndPort.Replace("/", "").Replace(":", "")); - fsmData.ShouldNotBeNullOrEmpty(); - var fakeCommand = JsonConvert.DeserializeObject(fsmData); - fakeCommand.Value.ShouldBe(command.Value); - passed++; - } + var passed = 0; + foreach (var peer in _peers.Peers) + { + var fsmData = await File.ReadAllTextAsync(peer.HostAndPort.Replace("/", "").Replace(":", "")); + fsmData.ShouldNotBeNullOrEmpty(); + var fakeCommand = JsonConvert.DeserializeObject(fsmData); + fakeCommand.Value.ShouldBe(command.Value); + passed++; + } - return passed == 5; + return passed == 5; + } + catch (Exception e) + { + return false; + } } - - var commandOnAllStateMachines = WaitFor(20000).Until(CommandCalledOnAllStateMachines); + Thread.Sleep(5000); + var commandOnAllStateMachines = await WaitFor(20000).Until(async () => await CommandCalledOnAllStateMachines()); commandOnAllStateMachines.ShouldBeTrue(); } diff --git a/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs b/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs index 3e2724d..f648829 100644 --- a/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs +++ b/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs @@ -7,6 +7,7 @@ using Rafty.FiniteStateMachine; using Rafty.Log; using Rafty.Concensus.States; +using System.Threading.Tasks; namespace Rafty.UnitTests { @@ -35,7 +36,7 @@ public AllServersApplyToStateMachineTests() } [Fact] - public void FollowerShouldApplyLogsToFsm() + public async Task FollowerShouldApplyLogsToFsm() { var currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var fsm = new Rafty.FiniteStateMachine.InMemoryStateMachine(); @@ -49,7 +50,7 @@ public void FollowerShouldApplyLogsToFsm() .WithEntry(log) .Build(); //assume node has added the log.. - _log.Apply(log); + await _log.Apply(log); var appendEntriesResponse = follower.Handle(appendEntries); follower.CurrentState.CurrentTerm.ShouldBe(1); follower.CurrentState.LastApplied.ShouldBe(1); @@ -57,7 +58,7 @@ public void FollowerShouldApplyLogsToFsm() } [Fact] - public void CandidateShouldApplyLogsToFsm() + public async Task CandidateShouldApplyLogsToFsm() { var currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var fsm = new Rafty.FiniteStateMachine.InMemoryStateMachine(); @@ -71,7 +72,7 @@ public void CandidateShouldApplyLogsToFsm() .WithLeaderCommitIndex(1) .Build(); //assume node has added the log.. - _log.Apply(log); + await _log.Apply(log); var appendEntriesResponse = candidate.Handle(appendEntries); candidate.CurrentState.CurrentTerm.ShouldBe(1); candidate.CurrentState.LastApplied.ShouldBe(1); @@ -82,7 +83,7 @@ public void CandidateShouldApplyLogsToFsm() [Fact] - public void LeaderShouldApplyLogsToFsm() + public async Task LeaderShouldApplyLogsToFsm() { var currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var fsm = new Rafty.FiniteStateMachine.InMemoryStateMachine(); @@ -96,7 +97,7 @@ public void LeaderShouldApplyLogsToFsm() .WithLeaderCommitIndex(1) .Build(); //assume node has added the log.. - _log.Apply(log); + await _log.Apply(log); var appendEntriesResponse = leader.Handle(appendEntries); leader.CurrentState.CurrentTerm.ShouldBe(1); leader.CurrentState.LastApplied.ShouldBe(1); diff --git a/test/Rafty.UnitTests/AppendEntriesTests.cs b/test/Rafty.UnitTests/AppendEntriesTests.cs index 4cb5962..adb0ff2 100644 --- a/test/Rafty.UnitTests/AppendEntriesTests.cs +++ b/test/Rafty.UnitTests/AppendEntriesTests.cs @@ -9,6 +9,8 @@ namespace Rafty.UnitTests { + using System.Threading.Tasks; + public class AppendEntriesTests { /* @@ -45,35 +47,35 @@ public AppendEntriesTests() } [Fact] - public void ShouldReplyFalseIfRpcTermLessThanCurrentTerm() + public async Task ShouldReplyFalseIfRpcTermLessThanCurrentTerm() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); var appendEntriesRpc = new AppendEntriesBuilder().WithTerm(0).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); - var appendEntriesResponse = follower.Handle(appendEntriesRpc); + var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(false); appendEntriesResponse.Term.ShouldBe(1); } [Fact] - public void ShouldReplyFalseIfLogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesRpcPrevLogTerm() + public async Task ShouldReplyFalseIfLogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesRpcPrevLogTerm() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 2, default(string), 0, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand(""), typeof(string), 2)); + await _log.Apply(new LogEntry(new FakeCommand(""), typeof(string), 2)); var appendEntriesRpc = new AppendEntriesBuilder().WithTerm(2).WithPreviousLogIndex(1).WithPreviousLogTerm(1).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); - var appendEntriesResponse = follower.Handle(appendEntriesRpc); + var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(false); appendEntriesResponse.Term.ShouldBe(2); } [Fact] - public void ShouldDeleteExistingEntryIfItConflictsWithNewOne() + public async Task ShouldDeleteExistingEntryIfItConflictsWithNewOne() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 2, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(new LogEntry(new FakeCommand("term 2 commit index 2"), typeof(string),2)) .WithTerm(2) @@ -81,18 +83,18 @@ public void ShouldDeleteExistingEntryIfItConflictsWithNewOne() .WithPreviousLogTerm(1) .Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); - var appendEntriesResponse = follower.Handle(appendEntriesRpc); + var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(true); appendEntriesResponse.Term.ShouldBe(2); } [Fact] - public void ShouldDeleteExistingEntryIfItConflictsWithNewOneAndAppendNewEntries() + public async Task ShouldDeleteExistingEntryIfItConflictsWithNewOneAndAppendNewEntries() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(new LogEntry(new FakeCommand("term 2 commit index 2"), typeof(string), 2)) .WithTerm(2) @@ -100,17 +102,17 @@ public void ShouldDeleteExistingEntryIfItConflictsWithNewOneAndAppendNewEntries( .WithPreviousLogTerm(1) .Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); - var appendEntriesResponse = follower.Handle(appendEntriesRpc); + var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(true); appendEntriesResponse.Term.ShouldBe(2); - _log.GetTermAtIndex(2).ShouldBe(2); + _log.GetTermAtIndex(2).Result.ShouldBe(2); } [Fact] - public void ShouldAppendAnyEntriesNotInTheLog() + public async Task ShouldAppendAnyEntriesNotInTheLog() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)) .WithTerm(1) @@ -119,19 +121,19 @@ public void ShouldAppendAnyEntriesNotInTheLog() .WithLeaderId(Guid.NewGuid().ToString()) .Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); - var appendEntriesResponse = follower.Handle(appendEntriesRpc); + var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(true); appendEntriesResponse.Term.ShouldBe(1); - _log.GetTermAtIndex(1).ShouldBe(1); + _log.GetTermAtIndex(1).Result.ShouldBe(1); follower.CurrentState.LeaderId.ShouldBe(appendEntriesRpc.LeaderId); } [Fact] - public void FollowerShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() + public async Task FollowerShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); var log = new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1); - _log.Apply(log); + await _log.Apply(log); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(log) .WithTerm(1) @@ -141,17 +143,17 @@ public void FollowerShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() .Build(); //assume node has applied log.. var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); - var appendEntriesResponse = follower.Handle(appendEntriesRpc); + var appendEntriesResponse = await follower.Handle(appendEntriesRpc); follower.CurrentState.CommitIndex.ShouldBe(1); } [Fact] - public void CandidateShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() + public async Task CandidateShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); //assume log applied by node? var log = new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1); - _log.Apply(log); + await _log.Apply(log); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(log) .WithTerm(1) @@ -161,18 +163,18 @@ public void CandidateShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() .WithLeaderId(Guid.NewGuid().ToString()) .Build(); var candidate = new Candidate(_currentState, _fsm, _peers, _log, _random, _node, _settings, _rules); - var appendEntriesResponse = candidate.Handle(appendEntriesRpc); + var appendEntriesResponse = await candidate.Handle(appendEntriesRpc); candidate.CurrentState.CommitIndex.ShouldBe(1); candidate.CurrentState.LeaderId.ShouldBe(appendEntriesRpc.LeaderId); } [Fact] - public void LeaderShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() + public async Task LeaderShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); //assume log applied by node? var log = new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1); - _log.Apply(log); + await _log.Apply(log); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(log) .WithTerm(1) @@ -182,7 +184,7 @@ public void LeaderShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex() .WithLeaderId(Guid.NewGuid().ToString()) .Build(); var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); - var state = leader.Handle(appendEntriesRpc); + var state = await leader.Handle(appendEntriesRpc); leader.CurrentState.CommitIndex.ShouldBe(1); leader.CurrentState.LeaderId.ShouldBe(appendEntriesRpc.LeaderId); } diff --git a/test/Rafty.UnitTests/CandidateTests.cs b/test/Rafty.UnitTests/CandidateTests.cs index 745571a..4637120 100644 --- a/test/Rafty.UnitTests/CandidateTests.cs +++ b/test/Rafty.UnitTests/CandidateTests.cs @@ -62,7 +62,7 @@ public void ShouldStartNewElectionIfTimesout() } [Fact] - public void ShouldBecomeFollowerIfAppendEntriesReceivedFromNewLeaderAndTermGreaterThanCurrentTerm() + public async Task ShouldBecomeFollowerIfAppendEntriesReceivedFromNewLeaderAndTermGreaterThanCurrentTerm() { _peers = new List { @@ -73,7 +73,7 @@ public void ShouldBecomeFollowerIfAppendEntriesReceivedFromNewLeaderAndTermGreat }; var candidate = new Candidate(_currentState, _fsm, _peers, _log, _random, _node, _settings, _rules); candidate.BeginElection(); - var appendEntriesResponse = candidate.Handle(new AppendEntriesBuilder().WithTerm(2).Build()); + var appendEntriesResponse = await candidate.Handle(new AppendEntriesBuilder().WithTerm(2).Build()); appendEntriesResponse.Success.ShouldBeTrue(); var node = (NothingNode)_node; node.BecomeFollowerCount.ShouldBe(1); @@ -96,7 +96,7 @@ public void ShouldBecomeFollowerIfRequestVoteResponseTermGreaterThanCurrentTerm( } [Fact] - public void ShouldNotBecomeFollowerIfAppendEntriesReceivedFromNewLeaderAndTermLessThanCurrentTerm() + public async Task ShouldNotBecomeFollowerIfAppendEntriesReceivedFromNewLeaderAndTermLessThanCurrentTerm() { _peers = new List { @@ -107,7 +107,7 @@ public void ShouldNotBecomeFollowerIfAppendEntriesReceivedFromNewLeaderAndTermLe }; var candidate = new Candidate(_currentState, _fsm, _peers, _log, _random, _node, _settings, _rules); candidate.BeginElection(); - var appendEntriesResponse = candidate.Handle(new AppendEntriesBuilder().WithTerm(0).Build()); + var appendEntriesResponse = await candidate.Handle(new AppendEntriesBuilder().WithTerm(0).Build()); appendEntriesResponse.Success.ShouldBeFalse(); var node = (NothingNode)_node; node.BecomeFollowerCount.ShouldBe(0); @@ -208,27 +208,27 @@ public void ShouldVoteForSelfWhenElectionStarts() } [Fact] - public void ShouldVoteForNewCandidateInAnotherTermsElection() + public async Task ShouldVoteForNewCandidateInAnotherTermsElection() { _node = new NothingNode(); _currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var candidate = new Candidate(_currentState, _fsm, _peers, _log, _random, _node, _settings, _rules); var requestVote = new RequestVoteBuilder().WithTerm(0).WithCandidateId(Guid.NewGuid().ToString()).WithLastLogIndex(1).Build(); - var requestVoteResponse = candidate.Handle(requestVote); + var requestVoteResponse = await candidate.Handle(requestVote); candidate.CurrentState.VotedFor.ShouldBe(requestVote.CandidateId); requestVoteResponse.VoteGranted.ShouldBeTrue(); requestVote = new RequestVoteBuilder().WithTerm(1).WithCandidateId(Guid.NewGuid().ToString()).WithLastLogIndex(1).Build(); - requestVoteResponse = candidate.Handle(requestVote); + requestVoteResponse = await candidate.Handle(requestVote); requestVoteResponse.VoteGranted.ShouldBeTrue(); candidate.CurrentState.VotedFor.ShouldBe(requestVote.CandidateId); } [Fact] - public void CandidateShouldTellClientToRetryCommand() + public async Task CandidateShouldTellClientToRetryCommand() { _node = new NothingNode(); var candidate = new Candidate(_currentState, _fsm, _peers, _log, _random, _node, _settings, _rules); - var response = candidate.Accept(new FakeCommand()); + var response = await candidate.Accept(new FakeCommand()); var error = (ErrorResponse)response; error.Error.ShouldBe("Please retry command later. Currently electing new a new leader."); } diff --git a/test/Rafty.UnitTests/FakePeer.cs b/test/Rafty.UnitTests/FakePeer.cs index 296e35a..a7e89ee 100644 --- a/test/Rafty.UnitTests/FakePeer.cs +++ b/test/Rafty.UnitTests/FakePeer.cs @@ -4,6 +4,7 @@ namespace Rafty.UnitTests { using System; using System.Collections.Generic; + using System.Threading.Tasks; using Concensus; internal class FakePeer : IPeer @@ -68,14 +69,14 @@ public FakePeer(bool grantVote, bool appendEntry, bool appendEntryTwo, bool appe public string Id => _id; - public RequestVoteResponse Request(RequestVote requestVote) + public async Task Request(RequestVote requestVote) { var response = new RequestVoteResponse(_grantVote, _term); RequestVoteResponses.Add(response); return response; } - public AppendEntriesResponse Request(AppendEntries appendEntries) + public async Task Request(AppendEntries appendEntries) { AppendEntriesResponse response; if (AppendEntriesResponses.Count == 1) @@ -97,7 +98,7 @@ public AppendEntriesResponse Request(AppendEntries appendEntries) return response; } - public Response Request(T command) where T : ICommand + public async Task> Request(T command) where T : ICommand { ReceivedCommands++; return new OkResponse(command); diff --git a/test/Rafty.UnitTests/FollowerTests.cs b/test/Rafty.UnitTests/FollowerTests.cs index 1887dd7..e972ab5 100644 --- a/test/Rafty.UnitTests/FollowerTests.cs +++ b/test/Rafty.UnitTests/FollowerTests.cs @@ -7,6 +7,7 @@ namespace Rafty.UnitTests { using System; using System.Collections.Generic; + using System.Threading.Tasks; using Concensus; using Rafty.Concensus.States; using Rafty.FiniteStateMachine; @@ -134,23 +135,23 @@ public void ShouldUpdateVotedFor() } [Fact] - public void ShouldVoteForNewCandidateInAnotherTermsElection() + public async Task ShouldVoteForNewCandidateInAnotherTermsElection() { _node = new NothingNode(); _currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); var requestVote = new RequestVoteBuilder().WithTerm(0).WithCandidateId(Guid.NewGuid().ToString()).WithLastLogIndex(1).Build(); - var requestVoteResponse = follower.Handle(requestVote); + var requestVoteResponse = await follower.Handle(requestVote); follower.CurrentState.VotedFor.ShouldBe(requestVote.CandidateId); requestVoteResponse.VoteGranted.ShouldBeTrue(); requestVote = new RequestVoteBuilder().WithTerm(1).WithCandidateId(Guid.NewGuid().ToString()).WithLastLogIndex(1).Build(); - requestVoteResponse = follower.Handle(requestVote); + requestVoteResponse = await follower.Handle(requestVote); requestVoteResponse.VoteGranted.ShouldBeTrue(); follower.CurrentState.VotedFor.ShouldBe(requestVote.CandidateId); } [Fact] - public void FollowerShouldForwardCommandToLeader() + public async Task FollowerShouldForwardCommandToLeader() { _node = new NothingNode(); var leaderId = Guid.NewGuid().ToString(); @@ -161,18 +162,18 @@ public void FollowerShouldForwardCommandToLeader() }; _currentState = new CurrentState(_currentState.Id, _currentState.CurrentTerm, _currentState.VotedFor, _currentState.CommitIndex, _currentState.LastApplied, leaderId); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); - var response = follower.Accept(new FakeCommand()); + var response = await follower.Accept(new FakeCommand()); response.ShouldBeOfType>(); leader.ReceivedCommands.ShouldBe(1); } [Fact] - public void FollowerShouldReturnRetryIfNoLeader() + public async Task FollowerShouldReturnRetryIfNoLeader() { _node = new NothingNode(); _currentState = new CurrentState(_currentState.Id, _currentState.CurrentTerm, _currentState.VotedFor, _currentState.CommitIndex, _currentState.LastApplied, _currentState.LeaderId); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); - var response = follower.Accept(new FakeCommand()); + var response = await follower.Accept(new FakeCommand()); var error = (ErrorResponse)response; error.Error.ShouldBe("Please retry command later. Unable to find leader."); } diff --git a/test/Rafty.UnitTests/InMemoryLogTests.cs b/test/Rafty.UnitTests/InMemoryLogTests.cs index 1c1f01d..f4f5e4f 100644 --- a/test/Rafty.UnitTests/InMemoryLogTests.cs +++ b/test/Rafty.UnitTests/InMemoryLogTests.cs @@ -1,5 +1,6 @@ namespace Rafty.UnitTests { + using System.Threading.Tasks; using Log; using Shouldly; using Xunit; @@ -10,81 +11,81 @@ public class InMemoryLogTests public void ShouldInitialiseCorrectly() { var log = new InMemoryLog(); - log.LastLogIndex.ShouldBe(1); - log.LastLogTerm.ShouldBe(0); + log.LastLogIndex().Result.ShouldBe(1); + log.LastLogTerm().Result.ShouldBe(0); } [Fact] - public void ShouldApplyLog() + public async Task ShouldApplyLog() { var log = new InMemoryLog(); - var index = log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + var index = await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); index.ShouldBe(1); } [Fact] - public void ShouldSetLastLogIndex() + public async Task ShouldSetLastLogIndex() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.LastLogIndex.ShouldBe(1); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + log.LastLogIndex().Result.ShouldBe(1); } [Fact] - public void ShouldSetLastLogTerm() + public async Task ShouldSetLastLogTerm() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.LastLogTerm.ShouldBe(1); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + log.LastLogTerm().Result.ShouldBe(1); } [Fact] - public void ShouldGetTermAtIndex() + public async Task ShouldGetTermAtIndex() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.GetTermAtIndex(1).ShouldBe(1); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + log.GetTermAtIndex(1).Result.ShouldBe(1); } [Fact] - public void ShouldDeleteConflict() + public async Task ShouldDeleteConflict() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); log.ExposedForTesting.Count.ShouldBe(0); } [Fact] - public void ShouldNotDeleteConflict() + public async Task ShouldNotDeleteConflict() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); log.ExposedForTesting.Count.ShouldBe(1); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogs() + public async Task ShouldDeleteConflictAndSubsequentLogs() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); log.ExposedForTesting.Count.ShouldBe(0); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogsFromMidPoint() + public async Task ShouldDeleteConflictAndSubsequentLogsFromMidPoint() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); log.ExposedForTesting.Count.ShouldBe(3); log.ExposedForTesting[1].Term.ShouldBe(1); log.ExposedForTesting[2].Term.ShouldBe(1); @@ -92,12 +93,12 @@ public void ShouldDeleteConflictAndSubsequentLogsFromMidPoint() } [Fact] - public void ShouldRemoveFromLog() + public async Task ShouldRemoveFromLog() { var log = new InMemoryLog(); - var index = log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Remove(index); - log.Count.ShouldBe(0); + var index = await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Remove(index); + log.Count().Result.ShouldBe(0); } } } \ No newline at end of file diff --git a/test/Rafty.UnitTests/LeaderTests.cs b/test/Rafty.UnitTests/LeaderTests.cs index d9c4186..339f54e 100644 --- a/test/Rafty.UnitTests/LeaderTests.cs +++ b/test/Rafty.UnitTests/LeaderTests.cs @@ -67,7 +67,7 @@ bool TestPeers(List peers) } [Fact] - public void ShouldAppendCommandToLocalLog() + public async Task ShouldAppendCommandToLocalLog() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -79,12 +79,12 @@ public void ShouldAppendCommandToLocalLog() var log = new InMemoryLog(); _currentState = new CurrentState(_id, 0, default(string), 0, 0, default(string)); var leader = new Leader(_currentState, _fsm, (s) => _peers, log, _node, _settings, _rules); - leader.Accept(new FakeCommand()); + await leader.Accept(new FakeCommand()); log.ExposedForTesting.Count.ShouldBe(1); } [Fact] - public void ShouldApplyCommandToStateMachine() + public async Task ShouldApplyCommandToStateMachine() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -96,7 +96,7 @@ public void ShouldApplyCommandToStateMachine() var log = new InMemoryLog(); _currentState = new CurrentState(_id, 0, default(string), 0, 0, default(string)); var leader = new Leader(_currentState, _fsm, (s) => _peers, log, _node, _settings, _rules); - var response = leader.Accept(new FakeCommand()); + var response = await leader.Accept(new FakeCommand()); log.ExposedForTesting.Count.ShouldBe(1); var fsm = (InMemoryStateMachine)_fsm; @@ -105,13 +105,13 @@ public void ShouldApplyCommandToStateMachine() } [Fact] - public void ShouldHandleCommandIfNoPeers() + public async Task ShouldHandleCommandIfNoPeers() { _peers = new List(); var log = new InMemoryLog(); _currentState = new CurrentState(_id, 0, default(string), 0, 0, default(string)); var leader = new Leader(_currentState, _fsm, (s) => _peers, log, _node, _settings, _rules); - var response = leader.Accept(new FakeCommand()); + var response = await leader.Accept(new FakeCommand()); log.ExposedForTesting.Count.ShouldBe(1); var fsm = (InMemoryStateMachine)_fsm; fsm.HandledLogEntries.ShouldBe(1); @@ -198,7 +198,7 @@ bool TestPeerStates() } [Fact] - public void ShouldSendAppendEntriesStartingAtNextIndex() + public async Task ShouldSendAppendEntriesStartingAtNextIndex() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -208,19 +208,19 @@ public void ShouldSendAppendEntriesStartingAtNextIndex() //add 3 logs var logOne = new LogEntry(new FakeCommand("1"), typeof(string), 1); - _log.Apply(logOne); + await _log.Apply(logOne); var logTwo = new LogEntry(new FakeCommand("2"), typeof(string), 1); - _log.Apply(logTwo); + await _log.Apply(logTwo); var logThree = new LogEntry(new FakeCommand("3"), typeof(string), 1); - _log.Apply(logThree); + await _log.Apply(logThree); _currentState = new CurrentState(_id, 1, default(string), 2, 2, default(string)); var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); - var logs = _log.GetFrom(1); + var logs = await _log.GetFrom(1); logs.Count.ShouldBe(3); } [Fact] - public void ShouldUpdateMatchIndexAndNextIndexIfSuccessful() + public async Task ShouldUpdateMatchIndexAndNextIndexIfSuccessful() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -230,11 +230,11 @@ public void ShouldUpdateMatchIndexAndNextIndexIfSuccessful() //add 3 logs _currentState = new CurrentState(_id, 1, default(string), 2, 2, default(string)); var logOne = new LogEntry(new FakeCommand("1"), typeof(string), 1); - _log.Apply(logOne); + await _log.Apply(logOne); var logTwo = new LogEntry(new FakeCommand("2"), typeof(string), 1); - _log.Apply(logTwo); + await _log.Apply(logTwo); var logThree = new LogEntry(new FakeCommand("3"), typeof(string), 1); - _log.Apply(logThree); + await _log.Apply(logThree); var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); bool FirstTest(List peerState) @@ -261,7 +261,7 @@ bool FirstTest(List peerState) } [Fact] - public void ShouldDecrementNextIndexAndRetry() + public async Task ShouldDecrementNextIndexAndRetry() { //create peers that will initially return false when asked to append entries... _peers = new List(); @@ -276,7 +276,7 @@ public void ShouldDecrementNextIndexAndRetry() var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); //send first command, this wont get commited because the guys are replying false - var task = Task.Run(async () => leader.Accept(new FakeCommand())); + var task = Task.Run(async () => await leader.Accept(new FakeCommand())); bool FirstTest(List peerState) { var passed = 0; @@ -337,7 +337,7 @@ bool SecondTest(List peerState) } //send another command, this wont get commited because the guys are replying false - task = Task.Run(async () => leader.Accept(new FakeCommand())); + task = Task.Run(async () => await leader.Accept(new FakeCommand())); bool ThirdTest(List peerState) { var passed = 0; @@ -391,7 +391,7 @@ bool FourthTest(List peerState) result.ShouldBeTrue(); //send another command - leader.Accept(new FakeCommand()); + await leader.Accept(new FakeCommand()); bool FirthTest(List peerState) { var passed = 0; @@ -416,7 +416,7 @@ bool FirthTest(List peerState) } [Fact] - public void ShouldSetCommitIndex() + public async Task ShouldSetCommitIndex() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -428,9 +428,9 @@ public void ShouldSetCommitIndex() //add 3 logs _currentState = new CurrentState(_id, 1, default(string), 0, 0, default(string)); var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); - leader.Accept(new FakeCommand()); - leader.Accept(new FakeCommand()); - leader.Accept(new FakeCommand()); + await leader.Accept(new FakeCommand()); + await leader.Accept(new FakeCommand()); + await leader.Accept(new FakeCommand()); bool PeersTest(List peerState) { @@ -492,7 +492,7 @@ bool TestPeerStates(List peerState) [Fact] - public void ShouldReplicateCommand() + public async Task ShouldReplicateCommand() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -502,7 +502,7 @@ public void ShouldReplicateCommand() _currentState = new CurrentState(_id, 1, default(string), 0, 0, default(string)); var leader = new Leader(_currentState,_fsm, (s) => _peers, _log, _node, _settings, _rules); var command = new FakeCommand(); - var response = leader.Accept(command); + var response = await leader.Accept(command); response.ShouldBeOfType>(); bool TestPeerStates(List peerState) { @@ -561,7 +561,7 @@ bool TestPeerStates(List peerState) } [Fact] - public void ShouldTimeoutAfterXSecondsIfCannotReplicateCommand() + public async Task ShouldTimeoutAfterXSecondsIfCannotReplicateCommand() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -572,7 +572,7 @@ public void ShouldTimeoutAfterXSecondsIfCannotReplicateCommand() _settings = new InMemorySettingsBuilder().WithCommandTimeout(1).Build(); var leader = new Leader(_currentState,_fsm, (s) => _peers, _log, _node, _settings, _rules); var command = new FakeCommand(); - var response = leader.Accept(command); + var response = await leader.Accept(command); var error = (ErrorResponse)response; error.Error.ShouldBe("Unable to replicate command to peers due to timeout."); bool TestPeerStates(List peerState) @@ -595,12 +595,12 @@ bool TestPeerStates(List peerState) return passed == peerState.Count * 2; } var result = WaitFor(1000).Until(() => TestPeerStates(leader.PeerStates)); - _log.Count.ShouldBe(0); + _log.Count().Result.ShouldBe(0); result.ShouldBeTrue(); } [Fact] - public void ShouldTimeoutAfterXSecondsIfCannotReplicateCommandAndRollbackIndexes() + public async Task ShouldTimeoutAfterXSecondsIfCannotReplicateCommandAndRollbackIndexes() { _peers = new List(); for (var i = 0; i < 3; i++) @@ -614,7 +614,7 @@ public void ShouldTimeoutAfterXSecondsIfCannotReplicateCommandAndRollbackIndexes _settings = new InMemorySettingsBuilder().WithCommandTimeout(1).Build(); var leader = new Leader(_currentState,_fsm, (s) => _peers, _log, _node, _settings, _rules); var command = new FakeCommand(); - var response = leader.Accept(command); + var response = await leader.Accept(command); var error = (ErrorResponse)response; error.Error.ShouldBe("Unable to replicate command to peers due to timeout."); bool TestPeerStates(List peerState) @@ -637,7 +637,7 @@ bool TestPeerStates(List peerState) return passed == peerState.Count * 2; } var result = WaitFor(1000).Until(() => TestPeerStates(leader.PeerStates)); - _log.Count.ShouldBe(0); + _log.Count().Result.ShouldBe(0); result.ShouldBeTrue(); } } diff --git a/test/Rafty.UnitTests/NothingNode.cs b/test/Rafty.UnitTests/NothingNode.cs index e7c73c9..8f8f04d 100644 --- a/test/Rafty.UnitTests/NothingNode.cs +++ b/test/Rafty.UnitTests/NothingNode.cs @@ -4,6 +4,8 @@ namespace Rafty.UnitTests { + using System.Threading.Tasks; + public class NothingNode : INode { public IState State { get; } @@ -27,12 +29,12 @@ public void BecomeCandidate(CurrentState state) BecomeCandidateCount++; } - public AppendEntriesResponse Handle(AppendEntries appendEntries) + public async Task Handle(AppendEntries appendEntries) { return new AppendEntriesResponseBuilder().Build(); } - public RequestVoteResponse Handle(RequestVote requestVote) + public async Task Handle(RequestVote requestVote) { return new RequestVoteResponseBuilder().Build(); } @@ -47,7 +49,7 @@ public void Stop() throw new System.NotImplementedException(); } - public Response Accept(T command) where T : ICommand + public async Task> Accept(T command) where T : ICommand { throw new System.NotImplementedException(); } diff --git a/test/Rafty.UnitTests/RemoteControledPeer.cs b/test/Rafty.UnitTests/RemoteControledPeer.cs index 07cfa7d..7ba8cc7 100644 --- a/test/Rafty.UnitTests/RemoteControledPeer.cs +++ b/test/Rafty.UnitTests/RemoteControledPeer.cs @@ -4,6 +4,8 @@ namespace Rafty.UnitTests { + using System.Threading.Tasks; + public class RemoteControledPeer : IPeer { private RequestVoteResponse _requestVoteResponse; @@ -28,19 +30,19 @@ public void SetAppendEntriesResponse(AppendEntriesResponse appendEntriesResponse _appendEntriesResponse = appendEntriesResponse; } - public RequestVoteResponse Request(RequestVote requestVote) + public async Task Request(RequestVote requestVote) { RequestVoteResponses++; return _requestVoteResponse; } - public AppendEntriesResponse Request(AppendEntries appendEntries) + public async Task Request(AppendEntries appendEntries) { AppendEntriesResponses++; return _appendEntriesResponse; } - public Response Request(T command) where T : ICommand + public async Task> Request(T command) where T : ICommand { throw new NotImplementedException(); } diff --git a/test/Rafty.UnitTests/RequestVoteTests.cs b/test/Rafty.UnitTests/RequestVoteTests.cs index b03d5f8..a3ff665 100644 --- a/test/Rafty.UnitTests/RequestVoteTests.cs +++ b/test/Rafty.UnitTests/RequestVoteTests.cs @@ -10,6 +10,8 @@ namespace Rafty.UnitTests { + using System.Threading.Tasks; + public class RequestVoteTests : IDisposable { /* @@ -42,45 +44,45 @@ public void Dispose() } [Fact] - public void FollowerShouldReplyFalseIfTermIsLessThanCurrentTerm() + public async Task FollowerShouldReplyFalseIfTermIsLessThanCurrentTerm() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 1, 0, default(string)); var requestVoteRpc = new RequestVoteBuilder().WithTerm(0).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); - var requestVoteResponse = follower.Handle(requestVoteRpc); + var requestVoteResponse = await follower.Handle(requestVoteRpc); requestVoteResponse.VoteGranted.ShouldBe(false); requestVoteResponse.Term.ShouldBe(1); } [Fact] - public void FollowerShouldReplyFalseIfVotedForIsNotDefault() + public async Task FollowerShouldReplyFalseIfVotedForIsNotDefault() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, Guid.NewGuid().ToString(), 1, 0, default(string)); var requestVoteRpc = new RequestVoteBuilder().WithTerm(0).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); - var requestVoteResponse = follower.Handle(requestVoteRpc); + var requestVoteResponse = await follower.Handle(requestVoteRpc); requestVoteResponse.VoteGranted.ShouldBe(false); requestVoteResponse.Term.ShouldBe(1); } [Fact] - public void FollowerShouldReplyFalseIfVotedForIsNotCandidateId() + public async Task FollowerShouldReplyFalseIfVotedForIsNotCandidateId() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, Guid.NewGuid().ToString(), 1, 0, default(string)); var requestVoteRpc = new RequestVoteBuilder().WithCandidateId(Guid.NewGuid().ToString()).WithTerm(0).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); - var requestVoteResponse = follower.Handle(requestVoteRpc); + var requestVoteResponse = await follower.Handle(requestVoteRpc); requestVoteResponse.VoteGranted.ShouldBe(false); requestVoteResponse.Term.ShouldBe(1); } [Fact] - public void FollowerShouldGrantVote() + public async Task FollowerShouldGrantVote() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 1, 0, default(string)); var requestVoteRpc = new RequestVoteBuilder().WithLastLogIndex(1).WithLastLogTerm(0).WithTerm(1).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings,_rules, _peers); - var requestVoteResponse = follower.Handle(requestVoteRpc); + var requestVoteResponse = await follower.Handle(requestVoteRpc); requestVoteResponse.VoteGranted.ShouldBe(true); requestVoteResponse.Term.ShouldBe(1); } diff --git a/test/Rafty.UnitTests/TestingNode.cs b/test/Rafty.UnitTests/TestingNode.cs index 2882199..3ec59a4 100644 --- a/test/Rafty.UnitTests/TestingNode.cs +++ b/test/Rafty.UnitTests/TestingNode.cs @@ -3,6 +3,7 @@ namespace Rafty.UnitTests { using System; + using System.Threading.Tasks; using Concensus; /* Followers(�5.2): � Respond to RPCs from candidates and leaders @@ -38,14 +39,14 @@ public void BecomeCandidate(CurrentState state) BecomeCandidateCount++; } - public AppendEntriesResponse Handle(AppendEntries appendEntries) + public async Task Handle(AppendEntries appendEntries) { - return State.Handle(appendEntries); + return await State.Handle(appendEntries); } - public RequestVoteResponse Handle(RequestVote requestVote) + public async Task Handle(RequestVote requestVote) { - return State.Handle(requestVote); + return await State.Handle(requestVote); } public void Start(string id) @@ -58,7 +59,7 @@ public void Stop() throw new NotImplementedException(); } - public Response Accept(T command) where T : ICommand + public async Task> Accept(T command) where T : ICommand { throw new NotImplementedException(); }