diff --git a/src/Rafty/Concensus/States/Candidate.cs b/src/Rafty/Concensus/States/Candidate.cs index 550bffd..2590a8e 100644 --- a/src/Rafty/Concensus/States/Candidate.cs +++ b/src/Rafty/Concensus/States/Candidate.cs @@ -84,18 +84,18 @@ public async Task Handle(AppendEntries appendEntries) return response.appendEntriesResponse; } - response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); + response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); if(response.shouldReturn) { return response.appendEntriesResponse; } - _rules.DeleteAnyConflictsInLog(appendEntries, _log); + await _rules.DeleteAnyConflictsInLog(appendEntries, _log); - _rules.ApplyEntriesToLog(appendEntries, _log); + await _rules.ApplyEntriesToLog(appendEntries, _log); - var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); + var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied); @@ -129,7 +129,7 @@ public async Task Handle(RequestVote requestVote) return response.requestVoteResponse; } - response = LastLogIndexAndLastLogTermMatchesThis(requestVote); + response = await LastLogIndexAndLastLogTermMatchesThis(requestVote); if(response.shouldReturn) { @@ -234,7 +234,7 @@ private void ShouldBecomeLeader() private async Task RequestVote(IPeer peer, BlockingCollection requestVoteResponses) { - var requestVoteResponse = await peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm)); + var requestVoteResponse = await peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm())); requestVoteResponses.Add(requestVoteResponse); } @@ -271,7 +271,7 @@ private async Task ApplyToStateMachine(int commitIndex, int lastApplied) while (commitIndex > lastApplied) { lastApplied++; - var log = _log.Get(lastApplied); + var log = await _log.Get(lastApplied); await _fsm.Handle(log); } @@ -303,10 +303,10 @@ private void AppendEntriesTermIsGreaterThanCurrentTerm(AppendEntries appendEntri return (null, false); } - private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) + private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) { - if (requestVote.LastLogIndex == _log.LastLogIndex && - requestVote.LastLogTerm == _log.LastLogTerm) + if (requestVote.LastLogIndex == await _log.LastLogIndex() && + requestVote.LastLogTerm == await _log.LastLogTerm()) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); BecomeFollower(); diff --git a/src/Rafty/Concensus/States/Follower.cs b/src/Rafty/Concensus/States/Follower.cs index 2ee3b68..5a4f302 100644 --- a/src/Rafty/Concensus/States/Follower.cs +++ b/src/Rafty/Concensus/States/Follower.cs @@ -55,18 +55,18 @@ public async Task Handle(AppendEntries appendEntries) return response.appendEntriesResponse; } - response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); + response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState); if(response.shouldReturn) { return response.appendEntriesResponse; } - _rules.DeleteAnyConflictsInLog(appendEntries, _log); + await _rules.DeleteAnyConflictsInLog(appendEntries, _log); - _rules.ApplyEntriesToLog(appendEntries, _log); + await _rules.ApplyEntriesToLog(appendEntries, _log); - var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); + var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied, appendEntries); @@ -100,7 +100,7 @@ public async Task Handle(RequestVote requestVote) return response.requestVoteResponse; } - response = LastLogIndexAndLastLogTermMatchesThis(requestVote); + response = await LastLogIndexAndLastLogTermMatchesThis(requestVote); _messagesSinceLastElectionExpiry++; @@ -140,10 +140,10 @@ public void Stop() return (null, false); } - private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) + private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote) { - if (requestVote.LastLogIndex == _log.LastLogIndex && - requestVote.LastLogTerm == _log.LastLogTerm) + if (requestVote.LastLogIndex == await _log.LastLogIndex() && + requestVote.LastLogTerm == await _log.LastLogTerm()) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); @@ -158,7 +158,7 @@ private async Task ApplyToStateMachine(int commitIndex, int lastApplied, AppendE while (commitIndex > lastApplied) { lastApplied++; - var log = _log.Get(lastApplied); + var log = await _log.Get(lastApplied); await _fsm.Handle(log); } diff --git a/src/Rafty/Concensus/States/Leader.cs b/src/Rafty/Concensus/States/Leader.cs index 0cf6bb6..ab3b8a0 100644 --- a/src/Rafty/Concensus/States/Leader.cs +++ b/src/Rafty/Concensus/States/Leader.cs @@ -58,13 +58,13 @@ public void Stop() public async Task> Accept(T command) where T : ICommand { - var indexOfCommand = AddCommandToLog(command); + var indexOfCommand = await AddCommandToLog(command); var peers = _getPeers(CurrentState); if(No(peers)) { - var log = _log.Get(indexOfCommand); + var log = await _log.Get(indexOfCommand); await ApplyToStateMachineAndUpdateCommitIndex(log); return Ok(command); } @@ -76,7 +76,7 @@ public async Task Handle(AppendEntries appendEntries) { if (appendEntries.Term > CurrentState.CurrentTerm) { - var response = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); + var response = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState); await ApplyToStateMachine(appendEntries, response.commitIndex, response.lastApplied); @@ -111,7 +111,7 @@ private ConcurrentBag SetUpAppendingEntries() private async Task GetAppendEntriesResponse(PeerState p, List<(int, LogEntry logEntry)> logsToSend) { - var appendEntriesResponse = await p.Peer.Request(new AppendEntries(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm, logsToSend.Select(x => x.logEntry).ToList(), CurrentState.CommitIndex)); + var appendEntriesResponse = await p.Peer.Request(new AppendEntries(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm(), logsToSend.Select(x => x.logEntry).ToList(), CurrentState.CommitIndex)); return appendEntriesResponse; } @@ -145,7 +145,7 @@ private void UpdateIndexes(PeerState peer, List<(int index, LogEntry logEntry)> } } - private void SendAppendEntries() + private async Task SendAppendEntries() { if(_appendingEntries == true) { @@ -166,9 +166,9 @@ private void SendAppendEntries() { var peersNotInPeerStates = peers.Where(p => !PeerStates.Select(x => x.Peer.Id).Contains(p.Id)).ToList(); - peersNotInPeerStates.ForEach(p => { + peersNotInPeerStates.ForEach(async p => { var matchIndex = new MatchIndex(p, 0); - var nextIndex = new NextIndex(p, _log.LastLogIndex); + var nextIndex = new NextIndex(p, await _log.LastLogIndex()); PeerStates.Add(new PeerState(p, matchIndex, nextIndex)); }); } @@ -177,7 +177,7 @@ private void SendAppendEntries() async Task Do(PeerState peer) { - var logsToSend = GetLogsForPeer(peer.NextIndex); + var logsToSend = await GetLogsForPeer(peer.NextIndex); var appendEntriesResponse = await GetAppendEntriesResponse(peer, logsToSend); @@ -199,7 +199,7 @@ async Task Do(PeerState peer) return; } - UpdateCommitIndex(); + await UpdateCommitIndex(); _appendingEntries = false; } @@ -216,7 +216,7 @@ async Task Do(PeerState peer) return (false, 0); } - private void UpdateCommitIndex() + private async Task UpdateCommitIndex() { var nextCommitIndex = CurrentState.CommitIndex + 1; var statesIndexOfHighestKnownReplicatedLogs = PeerStates.Select(x => x.MatchIndex.IndexOfHighestKnownReplicatedLog).ToList(); @@ -224,7 +224,7 @@ private void UpdateCommitIndex() var lessThanN = statesIndexOfHighestKnownReplicatedLogs.Where(x => x < nextCommitIndex).ToList(); if (greaterOrEqualToN.Count > lessThanN.Count) { - if (_log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) + if (await _log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, CurrentState.VotedFor, nextCommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); @@ -235,9 +235,9 @@ private void UpdateCommitIndex() private void ResetElectionTimer() { _electionTimer?.Dispose(); - _electionTimer = new Timer(x => + _electionTimer = new Timer(async x => { - SendAppendEntries(); + await SendAppendEntries(); }, null, 0, Convert.ToInt32(_settings.HeartbeatTimeout)); } @@ -246,17 +246,17 @@ private void InitialisePeerStates() { PeerStates = new List(); var peers = _getPeers(CurrentState); - peers.ForEach(p => { + peers.ForEach(async p => { var matchIndex = new MatchIndex(p, 0); - var nextIndex = new NextIndex(p, _log.LastLogIndex); + var nextIndex = new NextIndex(p, await _log.LastLogIndex()); PeerStates.Add(new PeerState(p, matchIndex, nextIndex)); }); } - private int AddCommandToLog(T command) where T : ICommand + private async Task AddCommandToLog(T command) where T : ICommand { var log = new LogEntry(command, command.GetType(), CurrentState.CurrentTerm); - var index = _log.Apply(log); + var index = await _log.Apply(log); return index; } @@ -293,13 +293,13 @@ private void Wait() Thread.Sleep(_settings.HeartbeatTimeout); } - private List<(int index ,LogEntry logEntry)> GetLogsForPeer(NextIndex nextIndex) + private async Task> GetLogsForPeer(NextIndex nextIndex) { - if (_log.Count > 0) + if (await _log.Count() > 0) { - if (_log.LastLogIndex >= nextIndex.NextLogIndexToSendToPeer) + if (await _log.LastLogIndex() >= nextIndex.NextLogIndexToSendToPeer) { - var logs = _log.GetFrom(nextIndex.NextLogIndexToSendToPeer); + var logs = await _log.GetFrom(nextIndex.NextLogIndexToSendToPeer); return logs; } } @@ -325,7 +325,7 @@ private async Task ApplyToStateMachine(AppendEntries appendEntries, int commitIn while (commitIndex > lastApplied) { lastApplied++; - var log = _log.Get(lastApplied); + var log = await _log.Get(lastApplied); await _fsm.Handle(log); } @@ -351,7 +351,7 @@ private bool No(List peers) private async Task ApplyToStateMachineAndUpdateCommitIndex(LogEntry log) { var nextCommitIndex = CurrentState.CommitIndex + 1; - if (_log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) + if (await _log.GetTermAtIndex(nextCommitIndex) == CurrentState.CurrentTerm) { CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, CurrentState.VotedFor, nextCommitIndex, CurrentState.LastApplied, CurrentState.LeaderId); @@ -365,10 +365,10 @@ private OkResponse Ok(T command) return new OkResponse(command); } - private ErrorResponse UnableDueToTimeout(T command, int indexOfCommand) + private async Task> UnableDueToTimeout(T command, int indexOfCommand) { DecrementIndexesOfAnyPeersCommandReplicatedTo(indexOfCommand); - _log.Remove(indexOfCommand); + await _log.Remove(indexOfCommand); _appendingEntries = false; return new ErrorResponse("Unable to replicate command to peers due to timeout.", command); } @@ -392,7 +392,7 @@ private async Task> Replicate(T command, int indexOfCommand) { if(ReplicationTimeout()) { - return UnableDueToTimeout(command, indexOfCommand); + return await UnableDueToTimeout(command, indexOfCommand); } var replicated = 0; @@ -406,7 +406,7 @@ private async Task> Replicate(T command, int indexOfCommand) if (ReplicatedToMajority(replicated)) { - var log = _log.Get(indexOfCommand); + var log = await _log.Get(indexOfCommand); await _fsm.Handle(log); FinishWaitingForCommandToReplicate(); break; diff --git a/src/Rafty/Concensus/States/Rules.cs b/src/Rafty/Concensus/States/Rules.cs index caae8ea..be2c390 100644 --- a/src/Rafty/Concensus/States/Rules.cs +++ b/src/Rafty/Concensus/States/Rules.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using Rafty.Log; namespace Rafty.Concensus.States @@ -6,10 +7,10 @@ namespace Rafty.Concensus.States public interface IRules { (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) AppendEntriesTermIsLessThanCurrentTerm(AppendEntries appendEntries, CurrentState currentState); - (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState); - void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log); - void ApplyEntriesToLog(AppendEntries appendEntries, ILog log); - (int commitIndex, int lastApplied) CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState); + Task<(AppendEntriesResponse appendEntriesResponse, bool shouldReturn)> LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState); + Task DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log); + Task ApplyEntriesToLog(AppendEntries appendEntries, ILog log); + Task<(int commitIndex, int lastApplied)> CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState); (RequestVoteResponse requestVoteResponse, bool shouldReturn) RequestVoteTermIsLessThanCurrentTerm(RequestVote requestVote, CurrentState currentState); (RequestVoteResponse requestVoteResponse, bool shouldReturn) VotedForIsNotThisOrNobody(RequestVote requestVote, CurrentState currentState); } @@ -39,34 +40,34 @@ public class Rules : IRules } // todo - inject as function into candidate and follower as logic is the same... - public (int commitIndex, int lastApplied) CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState) + public async Task<(int commitIndex, int lastApplied)> CommitIndexAndLastApplied(AppendEntries appendEntries, ILog log, CurrentState currentState) { var commitIndex = currentState.CommitIndex; var lastApplied = currentState.LastApplied; if (appendEntries.LeaderCommitIndex > currentState.CommitIndex) { - var lastNewEntry = log.LastLogIndex; + var lastNewEntry = await log.LastLogIndex(); commitIndex = System.Math.Min(appendEntries.LeaderCommitIndex, lastNewEntry); } return (commitIndex, lastApplied); } // todo - inject as function into candidate and follower as logic is the same... - public void ApplyEntriesToLog(AppendEntries appendEntries, ILog log) + public async Task ApplyEntriesToLog(AppendEntries appendEntries, ILog log) { foreach (var entry in appendEntries.Entries) { - log.Apply(entry); + await log.Apply(entry); } } // todo - inject as function into candidate and follower as logic is the same... - public void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log) + public async Task DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log) { var count = 1; foreach (var newLog in appendEntries.Entries) { - log.DeleteConflictsFromThisLog(appendEntries.PreviousLogIndex + 1, newLog); + await log.DeleteConflictsFromThisLog(appendEntries.PreviousLogIndex + 1, newLog); count++; } } @@ -83,9 +84,9 @@ public void DeleteAnyConflictsInLog(AppendEntries appendEntries, ILog log) } // todo - inject as function into candidate and follower as logic is the same... - public (AppendEntriesResponse appendEntriesResponse, bool shouldReturn) LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState) + public async Task<(AppendEntriesResponse appendEntriesResponse, bool shouldReturn)> LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(AppendEntries appendEntries, ILog log, CurrentState currentState) { - var termAtPreviousLogIndex = log.GetTermAtIndex(appendEntries.PreviousLogIndex); + var termAtPreviousLogIndex = await log.GetTermAtIndex(appendEntries.PreviousLogIndex); if (termAtPreviousLogIndex > 0 && termAtPreviousLogIndex != appendEntries.PreviousLogTerm) { return (new AppendEntriesResponse(currentState.CurrentTerm, false), true); diff --git a/src/Rafty/Log/ILog.cs b/src/Rafty/Log/ILog.cs index dd80810..8633097 100644 --- a/src/Rafty/Log/ILog.cs +++ b/src/Rafty/Log/ILog.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; namespace Rafty.Log { @@ -8,39 +9,39 @@ public interface ILog /// /// This will apply a log entry and return its index /// - int Apply(LogEntry log); + Task Apply(LogEntry log); /// /// This will return the log entry at the index passed in /// - LogEntry Get(int index); + Task Get(int index); /// /// This will return all the log entries from a certain point based on index including the first match on the index passed in /// - List<(int index, LogEntry logEntry)> GetFrom(int index); + Task> GetFrom(int index); /// /// This will return the last known log index or 1 /// - int LastLogIndex {get;} + Task LastLogIndex(); /// /// This will return the last know log term or 0 /// - long LastLogTerm {get;} + Task LastLogTerm(); /// /// This will get the term at the index passed in /// - long GetTermAtIndex(int index); + Task GetTermAtIndex(int index); /// /// This will delete any conflicts from the log, if the log entry passed in doesnt match the log entry //in the log for the given index it will also delete any further logs /// - void DeleteConflictsFromThisLog(int index, LogEntry logEntry); + Task DeleteConflictsFromThisLog(int index, LogEntry logEntry); /// /// This returns a count of the logs /// - int Count { get; } + Task Count(); /// /// This removes the command at the index passed in. /// - void Remove(int indexOfCommand); + Task Remove(int indexOfCommand); } } \ No newline at end of file diff --git a/src/Rafty/Log/InMemoryLog.cs b/src/Rafty/Log/InMemoryLog.cs index 55cf39a..31c1815 100644 --- a/src/Rafty/Log/InMemoryLog.cs +++ b/src/Rafty/Log/InMemoryLog.cs @@ -4,6 +4,7 @@ namespace Rafty.Log { using System; using System.Collections.Generic; + using System.Threading.Tasks; using Rafty.Infrastructure; public class InMemoryLog : ILog @@ -17,7 +18,7 @@ public InMemoryLog() public Dictionary ExposedForTesting => _log; - public List<(int index, LogEntry logEntry)> GetFrom(int index) + public Task> GetFrom(int index) { var logsToReturn = new List<(int, LogEntry)>(); @@ -29,64 +30,58 @@ public InMemoryLog() } } - return logsToReturn; + return Task.FromResult(logsToReturn); } - public int LastLogIndex + public Task LastLogIndex() { - get - { var lastLog = _log.LastOrDefault(); if(lastLog.Value != null) { - return lastLog.Key; + return Task.FromResult(lastLog.Key); } - return 1; - } + return Task.FromResult(1); } - public long LastLogTerm + public Task LastLogTerm() { - get - { - var lastLog = _log.LastOrDefault(); + var lastLog = _log.LastOrDefault(); if(lastLog.Value != null) { - return lastLog.Value.Term; + return Task.FromResult(lastLog.Value.Term); } - return 0; - } + return Task.FromResult((long)0); } - public int Apply(LogEntry logEntry) + public Task Apply(LogEntry logEntry) { if(_log.Count <= 0) { _log.Add(1, logEntry); - return 1; + return Task.FromResult(1); } else { var nextIndex = _log.Max(x => x.Key) + 1; _log.Add(nextIndex, logEntry); - return nextIndex; + return Task.FromResult(nextIndex); } } - public long GetTermAtIndex(int index) + public Task GetTermAtIndex(int index) { if(_log.Count == 0) { - return 0; + return Task.FromResult((long)0); } if(index > _log.Count) { - return 0; + return Task.FromResult((long)0); } if (index <= 0) @@ -94,14 +89,14 @@ public long GetTermAtIndex(int index) throw new Exception("Log starts at 1..."); } - return _log[index].Term; + return Task.FromResult(_log[index].Term); } - public void DeleteConflictsFromThisLog(int logIndex, LogEntry logEntry) + public Task DeleteConflictsFromThisLog(int logIndex, LogEntry logEntry) { if(logIndex > 1 && logIndex > _log.Count -1) { - return; + return Task.CompletedTask;; } for (int i = logIndex; i <= _log.Max(x => x.Key); i++) @@ -114,20 +109,24 @@ public void DeleteConflictsFromThisLog(int logIndex, LogEntry logEntry) break; } } + + return Task.CompletedTask; } - private void RemoveRange(int indexToRemove, int toRemove) + private Task RemoveRange(int indexToRemove, int toRemove) { while(_log.ContainsKey(indexToRemove)) { _log.Remove(indexToRemove); indexToRemove++; } + + return Task.CompletedTask; } - public int Count => _log.Count; + public Task Count() => Task.FromResult(_log.Count); - public LogEntry Get(int index) + public Task Get(int index) { if (index <= 0) { @@ -136,15 +135,16 @@ public LogEntry Get(int index) if(_log.Count >= index) { - return _log[index]; + return Task.FromResult(_log[index]); } throw new Exception("Nothing in log.."); } - public void Remove(int indexOfCommand) + public Task Remove(int indexOfCommand) { _log.Remove(indexOfCommand); + return Task.CompletedTask; } } } \ No newline at end of file diff --git a/test/Rafty.AcceptanceTests/Tests.cs b/test/Rafty.AcceptanceTests/Tests.cs index 182dd13..96786b4 100644 --- a/test/Rafty.AcceptanceTests/Tests.cs +++ b/test/Rafty.AcceptanceTests/Tests.cs @@ -61,7 +61,7 @@ public async Task ShouldRunInSoloModeAcceptCommandThenAddNewServersThatBecomeFol await SendCommandToLeader(); AddNewServers(4); AssertLeaderElected(4); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); } [Fact] @@ -74,7 +74,7 @@ public async Task ShouldRunInSoloModeThenAddNewServersThatBecomeFollowersAndComm AddNewServers(4); AssertLeaderElected(4); await SendCommandToLeader(); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); } [Fact] @@ -125,7 +125,7 @@ public async Task LeaderShouldAcceptCommandThenPersistToFollowersAndApplyToState StartNodes(); AssertLeaderElected(4); await SendCommandToLeader(); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); } [Fact] @@ -136,7 +136,7 @@ public async Task FollowerShouldForwardCommandToLeaderThenPersistToFollowersAndA StartNodes(); AssertLeaderElected(4); await SendCommandToFollower(); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); } [Fact] @@ -147,13 +147,13 @@ public async Task LeaderShouldAcceptManyCommandsThenPersistToFollowersAndApplyTo StartNodes(); AssertLeaderElected(4); await SendCommandToLeader(); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); await SendCommandToLeader(); - AssertCommandAccepted(2, 4); + await AssertCommandAccepted(2, 4); await SendCommandToLeader(); - AssertCommandAccepted(3, 4); + await AssertCommandAccepted(3, 4); await SendCommandToLeader(); - AssertCommandAccepted(4, 4); + await AssertCommandAccepted(4, 4); } [Fact] @@ -165,12 +165,12 @@ public async Task ShouldCatchUpIfNodeDies() KillTheLeader(); AssertLeaderElected(3); await SendCommandToLeader(); - AssertCommandAccepted(1, 3); + await AssertCommandAccepted(1, 3); BringPreviousLeaderBackToLife(); AssertLeaderElected(4); - AssertCommandAccepted(1, 4); + await AssertCommandAccepted(1, 4); await SendCommandToLeader(); - AssertCommandAccepted(2, 4); + await AssertCommandAccepted(2, 4); } private void AddNewServers(int count) @@ -218,19 +218,19 @@ async Task SendCommand() sentCommand.ShouldBeTrue(); } - private void AssertCommandAccepted(int expectedReplicatedCount, int expectedFollowers) + private async Task AssertCommandAccepted(int expectedReplicatedCount, int expectedFollowers) { - bool IsReplicatedToLeader(KeyValuePair server) + async Task IsReplicatedToLeader(KeyValuePair server) { - return server.Value.Log.Count == expectedReplicatedCount && + return await server.Value.Log.Count() == expectedReplicatedCount && server.Value.Fsm.HandledLogEntries == expectedReplicatedCount; } var leaderServer = GetLeader(); - var appliedToLeaderFsm = WaitFor(25000).Until(() => IsReplicatedToLeader(leaderServer)); + var appliedToLeaderFsm = await WaitFor(25000).Until(() => IsReplicatedToLeader(leaderServer)); appliedToLeaderFsm.ShouldBeTrue(); - bool IsReplicatedToFollowers() + async Task IsReplicatedToFollowers() { var followers = _servers .Select(x => x.Value) @@ -244,7 +244,7 @@ bool IsReplicatedToFollowers() foreach(var follower in followers) { - if(follower.Log.Count != expectedReplicatedCount) + if(await follower.Log.Count() != expectedReplicatedCount) { return false; } @@ -258,7 +258,7 @@ bool IsReplicatedToFollowers() return true; } - var appliedToFollowersFsm = WaitFor(25000).Until(() => IsReplicatedToFollowers()); + var appliedToFollowersFsm = await WaitFor(25000).Until(() => IsReplicatedToFollowers()); appliedToFollowersFsm.ShouldBeTrue(); } diff --git a/test/Rafty.IntegrationTests/SqlLiteLog.cs b/test/Rafty.IntegrationTests/SqlLiteLog.cs index 3d1d953..4e6eab3 100644 --- a/test/Rafty.IntegrationTests/SqlLiteLog.cs +++ b/test/Rafty.IntegrationTests/SqlLiteLog.cs @@ -5,24 +5,24 @@ using System; using Rafty.Infrastructure; using System.Collections.Generic; +using System.Threading.Tasks; +using System.Threading; namespace Rafty.IntegrationTests { public class SqlLiteLog : ILog { private string _path; - private readonly object _lock = new object(); + private readonly SemaphoreSlim _sempaphore = new SemaphoreSlim(1,1); public SqlLiteLog(NodeId nodeId) { _path = $"{nodeId.Id.Replace("/","").Replace(":","").ToString()}.db"; if(!File.Exists(_path)) { - lock(_lock) - { - FileStream fs = File.Create(_path); - fs.Dispose(); - } + FileStream fs = File.Create(_path); + fs.Dispose(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { connection.Open(); @@ -38,241 +38,222 @@ data text not null } } - public int LastLogIndex + public async Task LastLogIndex() { - get + _sempaphore.Wait(); + var result = 1; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - lock(_lock) + connection.Open(); + var sql = @"select id from logs order by id desc limit 1"; + using(var command = new SqliteCommand(sql, connection)) { - var result = 1; - using(var connection = new SqliteConnection($"Data Source={_path};")) + var index = Convert.ToInt32(await command.ExecuteScalarAsync()); + if(index > result) { - connection.Open(); - var sql = @"select id from logs order by id desc limit 1"; - using(var command = new SqliteCommand(sql, connection)) - { - var index = Convert.ToInt32(command.ExecuteScalar()); - if(index > result) - { - result = index; - } - } + result = index; } - return result; } } + + _sempaphore.Release(); + return result; } - public long LastLogTerm + public async Task LastLogTerm() { - get + _sempaphore.Wait(); + long result = 0; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - lock(_lock) + connection.Open(); + var sql = @"select data from logs order by id desc limit 1"; + using(var command = new SqliteCommand(sql, connection)) { - long result = 0; - using(var connection = new SqliteConnection($"Data Source={_path};")) + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if(log != null && log.Term > result) { - connection.Open(); - var sql = @"select data from logs order by id desc limit 1"; - using(var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if(log != null && log.Term > result) - { - result = log.Term; - } - } + result = log.Term; } - return result; } } + _sempaphore.Release(); + return result; } - public int Count + public async Task Count() { - get + _sempaphore.Wait(); + var result = 0; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - lock(_lock) + connection.Open(); + var sql = @"select count(id) from logs"; + using(var command = new SqliteCommand(sql, connection)) { - var result = 0; - using(var connection = new SqliteConnection($"Data Source={_path};")) + var index = Convert.ToInt32(await command.ExecuteScalarAsync()); + if(index > result) { - connection.Open(); - var sql = @"select count(id) from logs"; - using(var command = new SqliteCommand(sql, connection)) - { - var index = Convert.ToInt32(command.ExecuteScalar()); - if(index > result) - { - result = index; - } - } + result = index; } - return result; } } + _sempaphore.Release(); + return result; } - public int Apply(LogEntry log) + public async Task Apply(LogEntry log) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var data = JsonConvert.SerializeObject(log, jsonSerializerSettings); + //todo - sql injection dont copy this.. + var sql = $"insert into logs (data) values ('{data}')"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var data = JsonConvert.SerializeObject(log, jsonSerializerSettings); - //todo - sql injection dont copy this.. - var sql = $"insert into logs (data) values ('{data}')"; - using(var command = new SqliteCommand(sql, connection)) - { - var result = command.ExecuteNonQuery(); - } - - sql = "select last_insert_rowid()"; - using(var command = new SqliteCommand(sql, connection)) - { - var result = command.ExecuteScalar(); - return Convert.ToInt32(result); - } + var result = await command.ExecuteNonQueryAsync(); } + + sql = "select last_insert_rowid()"; + using(var command = new SqliteCommand(sql, connection)) + { + var result = await command.ExecuteScalarAsync(); + _sempaphore.Release(); + return Convert.ToInt32(result); + } } } - public void DeleteConflictsFromThisLog(int index, LogEntry logEntry) + public async Task DeleteConflictsFromThisLog(int index, LogEntry logEntry) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index};"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index};"; - using(var command = new SqliteCommand(sql, connection)) + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if(logEntry != null && log != null && logEntry.Term != log.Term) { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if(logEntry != null && log != null && logEntry.Term != log.Term) + //todo - sql injection dont copy this.. + var deleteSql = $"delete from logs where id >= {index};"; + using(var deleteCommand = new SqliteCommand(deleteSql, connection)) { - //todo - sql injection dont copy this.. - var deleteSql = $"delete from logs where id >= {index};"; - using(var deleteCommand = new SqliteCommand(deleteSql, connection)) - { - var result = deleteCommand.ExecuteNonQuery(); - } + var result = await deleteCommand.ExecuteNonQueryAsync(); } } } } + _sempaphore.Release(); } - public LogEntry Get(int index) + public async Task Get(int index) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index}"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index}"; - using(var command = new SqliteCommand(sql, connection)) - { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - return log; - } + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + _sempaphore.Release(); + return log; } } } - public System.Collections.Generic.List<(int index, LogEntry logEntry)> GetFrom(int index) + public async Task> GetFrom(int index) { - lock(_lock) - { - var logsToReturn = new List<(int, LogEntry)>(); + _sempaphore.Wait(); + var logsToReturn = new List<(int, LogEntry)>(); - using(var connection = new SqliteConnection($"Data Source={_path};")) + using(var connection = new SqliteConnection($"Data Source={_path};")) + { + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select id, data from logs where id >= {index}"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select id, data from logs where id >= {index}"; - using(var command = new SqliteCommand(sql, connection)) + using(var reader = await command.ExecuteReaderAsync()) { - using(var reader = command.ExecuteReader()) + while(reader.Read()) { - while(reader.Read()) - { - var id = Convert.ToInt32(reader[0]); - var data = (string)reader[1]; - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - logsToReturn.Add((id, log)); + var id = Convert.ToInt32(reader[0]); + var data = (string)reader[1]; + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + logsToReturn.Add((id, log)); - } } } } - - return logsToReturn; + _sempaphore.Release(); + return logsToReturn; } - } - public long GetTermAtIndex(int index) + public async Task GetTermAtIndex(int index) { - lock(_lock) + _sempaphore.Wait(); + long result = 0; + using(var connection = new SqliteConnection($"Data Source={_path};")) { - long result = 0; - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var sql = $"select data from logs where id = {index}"; + using(var command = new SqliteCommand(sql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var sql = $"select data from logs where id = {index}"; - using(var command = new SqliteCommand(sql, connection)) + var data = Convert.ToString(await command.ExecuteScalarAsync()); + var jsonSerializerSettings = new JsonSerializerSettings() { + TypeNameHandling = TypeNameHandling.All + }; + var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); + if(log != null && log.Term > result) { - var data = Convert.ToString(command.ExecuteScalar()); - var jsonSerializerSettings = new JsonSerializerSettings() { - TypeNameHandling = TypeNameHandling.All - }; - var log = JsonConvert.DeserializeObject(data, jsonSerializerSettings); - if(log != null && log.Term > result) - { - result = log.Term; - } + result = log.Term; } } - return result; } + _sempaphore.Release(); + return result; } - public void Remove(int indexOfCommand) + public async Task Remove(int indexOfCommand) { - lock(_lock) + _sempaphore.Wait(); + using(var connection = new SqliteConnection($"Data Source={_path};")) { - using(var connection = new SqliteConnection($"Data Source={_path};")) + connection.Open(); + //todo - sql injection dont copy this.. + var deleteSql = $"delete from logs where id >= {indexOfCommand};"; + using(var deleteCommand = new SqliteCommand(deleteSql, connection)) { - connection.Open(); - //todo - sql injection dont copy this.. - var deleteSql = $"delete from logs where id >= {indexOfCommand};"; - using(var deleteCommand = new SqliteCommand(deleteSql, connection)) - { - var result = deleteCommand.ExecuteNonQuery(); - } + var result = await deleteCommand.ExecuteNonQueryAsync(); } } + _sempaphore.Release(); } } } \ No newline at end of file diff --git a/test/Rafty.IntegrationTests/SqlLiteLogTests.cs b/test/Rafty.IntegrationTests/SqlLiteLogTests.cs index f0e7e64..e9b16b7 100644 --- a/test/Rafty.IntegrationTests/SqlLiteLogTests.cs +++ b/test/Rafty.IntegrationTests/SqlLiteLogTests.cs @@ -2,6 +2,7 @@ namespace Rafty.UnitTests { using System; using System.IO; + using System.Threading.Tasks; using Log; using Rafty.Infrastructure; using Rafty.IntegrationTests; @@ -23,98 +24,98 @@ public SqlLiteLogTests() public void ShouldInitialiseCorrectly() { var path = Guid.NewGuid().ToString(); - _log.LastLogIndex.ShouldBe(1); - _log.LastLogTerm.ShouldBe(0); + _log.LastLogIndex().Result.ShouldBe(1); + _log.LastLogTerm().Result.ShouldBe(0); } [Fact] - public void ShouldApplyLog() + public async Task ShouldApplyLog() { - var index = _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + var index = await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); index.ShouldBe(1); } [Fact] - public void ShouldSetLastLogIndex() + public async Task ShouldSetLastLogIndex() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.LastLogIndex.ShouldBe(2); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + _log.LastLogIndex().Result.ShouldBe(2); } [Fact] - public void ShouldSetLastLogTerm() + public async Task ShouldSetLastLogTerm() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.LastLogTerm.ShouldBe(2); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.LastLogTerm().Result.ShouldBe(2); } [Fact] - public void ShouldGetTermAtIndex() + public async Task ShouldGetTermAtIndex() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.GetTermAtIndex(1).ShouldBe(1); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + _log.GetTermAtIndex(1).Result.ShouldBe(1); } [Fact] - public void ShouldDeleteConflict() + public async Task ShouldDeleteConflict() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.Count.ShouldBe(0); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.Count().Result.ShouldBe(0); } [Fact] - public void ShouldNotDeleteConflict() + public async Task ShouldNotDeleteConflict() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Count.ShouldBe(1); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); + _log.Count().Result.ShouldBe(1); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogs() + public async Task ShouldDeleteConflictAndSubsequentLogs() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.Count.ShouldBe(0); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.Count().Result.ShouldBe(0); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogsFromMidPoint() + public async Task ShouldDeleteConflictAndSubsequentLogsFromMidPoint() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); - _log.Count.ShouldBe(3); - _log.Get(1).Term.ShouldBe(1); - _log.Get(2).Term.ShouldBe(1); - _log.Get(3).Term.ShouldBe(1); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + _log.Count().Result.ShouldBe(3); + _log.Get(1).Result.Term.ShouldBe(1); + _log.Get(2).Result.Term.ShouldBe(1); + _log.Get(3).Result.Term.ShouldBe(1); } [Fact] - public void ShouldGetFrom() + public async Task ShouldGetFrom() { - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - var logs = _log.GetFrom(3); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + var logs = await _log.GetFrom(3); logs.Count.ShouldBe(3); } [Fact] - public void ShouldRemoveFromLog() + public async Task ShouldRemoveFromLog() { - var index = _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - _log.Remove(index); - _log.Count.ShouldBe(0); + var index = await _log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await _log.Remove(index); + _log.Count().Result.ShouldBe(0); } public void Dispose() { diff --git a/test/Rafty.IntegrationTests/Tests.cs b/test/Rafty.IntegrationTests/Tests.cs index c73638c..a138693 100644 --- a/test/Rafty.IntegrationTests/Tests.cs +++ b/test/Rafty.IntegrationTests/Tests.cs @@ -130,7 +130,7 @@ async Task CommandCalledOnAllStateMachines() return false; } } - + Thread.Sleep(5000); var commandOnAllStateMachines = await WaitFor(20000).Until(async () => await CommandCalledOnAllStateMachines()); commandOnAllStateMachines.ShouldBeTrue(); } diff --git a/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs b/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs index 3e2724d..f648829 100644 --- a/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs +++ b/test/Rafty.UnitTests/AllServersApplyToStateMachineTests.cs @@ -7,6 +7,7 @@ using Rafty.FiniteStateMachine; using Rafty.Log; using Rafty.Concensus.States; +using System.Threading.Tasks; namespace Rafty.UnitTests { @@ -35,7 +36,7 @@ public AllServersApplyToStateMachineTests() } [Fact] - public void FollowerShouldApplyLogsToFsm() + public async Task FollowerShouldApplyLogsToFsm() { var currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var fsm = new Rafty.FiniteStateMachine.InMemoryStateMachine(); @@ -49,7 +50,7 @@ public void FollowerShouldApplyLogsToFsm() .WithEntry(log) .Build(); //assume node has added the log.. - _log.Apply(log); + await _log.Apply(log); var appendEntriesResponse = follower.Handle(appendEntries); follower.CurrentState.CurrentTerm.ShouldBe(1); follower.CurrentState.LastApplied.ShouldBe(1); @@ -57,7 +58,7 @@ public void FollowerShouldApplyLogsToFsm() } [Fact] - public void CandidateShouldApplyLogsToFsm() + public async Task CandidateShouldApplyLogsToFsm() { var currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var fsm = new Rafty.FiniteStateMachine.InMemoryStateMachine(); @@ -71,7 +72,7 @@ public void CandidateShouldApplyLogsToFsm() .WithLeaderCommitIndex(1) .Build(); //assume node has added the log.. - _log.Apply(log); + await _log.Apply(log); var appendEntriesResponse = candidate.Handle(appendEntries); candidate.CurrentState.CurrentTerm.ShouldBe(1); candidate.CurrentState.LastApplied.ShouldBe(1); @@ -82,7 +83,7 @@ public void CandidateShouldApplyLogsToFsm() [Fact] - public void LeaderShouldApplyLogsToFsm() + public async Task LeaderShouldApplyLogsToFsm() { var currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); var fsm = new Rafty.FiniteStateMachine.InMemoryStateMachine(); @@ -96,7 +97,7 @@ public void LeaderShouldApplyLogsToFsm() .WithLeaderCommitIndex(1) .Build(); //assume node has added the log.. - _log.Apply(log); + await _log.Apply(log); var appendEntriesResponse = leader.Handle(appendEntries); leader.CurrentState.CurrentTerm.ShouldBe(1); leader.CurrentState.LastApplied.ShouldBe(1); diff --git a/test/Rafty.UnitTests/AppendEntriesTests.cs b/test/Rafty.UnitTests/AppendEntriesTests.cs index 5962233..adb0ff2 100644 --- a/test/Rafty.UnitTests/AppendEntriesTests.cs +++ b/test/Rafty.UnitTests/AppendEntriesTests.cs @@ -61,7 +61,7 @@ public async Task ShouldReplyFalseIfRpcTermLessThanCurrentTerm() public async Task ShouldReplyFalseIfLogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesRpcPrevLogTerm() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 2, default(string), 0, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand(""), typeof(string), 2)); + await _log.Apply(new LogEntry(new FakeCommand(""), typeof(string), 2)); var appendEntriesRpc = new AppendEntriesBuilder().WithTerm(2).WithPreviousLogIndex(1).WithPreviousLogTerm(1).Build(); var follower = new Follower(_currentState, _fsm, _log, _random, _node, _settings, _rules, _peers); var appendEntriesResponse = await follower.Handle(appendEntriesRpc); @@ -73,9 +73,9 @@ public async Task ShouldReplyFalseIfLogDoesntContainEntryAtPreviousLogIndexWhose public async Task ShouldDeleteExistingEntryIfItConflictsWithNewOne() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 2, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(new LogEntry(new FakeCommand("term 2 commit index 2"), typeof(string),2)) .WithTerm(2) @@ -92,9 +92,9 @@ public async Task ShouldDeleteExistingEntryIfItConflictsWithNewOne() public async Task ShouldDeleteExistingEntryIfItConflictsWithNewOneAndAppendNewEntries() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 2"), typeof(string), 1)); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(new LogEntry(new FakeCommand("term 2 commit index 2"), typeof(string), 2)) .WithTerm(2) @@ -105,14 +105,14 @@ public async Task ShouldDeleteExistingEntryIfItConflictsWithNewOneAndAppendNewEn var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(true); appendEntriesResponse.Term.ShouldBe(2); - _log.GetTermAtIndex(2).ShouldBe(2); + _log.GetTermAtIndex(2).Result.ShouldBe(2); } [Fact] public async Task ShouldAppendAnyEntriesNotInTheLog() { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); - _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); + await _log.Apply(new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1)); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(new LogEntry(new FakeCommand("term 1 commit index 1"), typeof(string), 1)) .WithTerm(1) @@ -124,7 +124,7 @@ public async Task ShouldAppendAnyEntriesNotInTheLog() var appendEntriesResponse = await follower.Handle(appendEntriesRpc); appendEntriesResponse.Success.ShouldBe(true); appendEntriesResponse.Term.ShouldBe(1); - _log.GetTermAtIndex(1).ShouldBe(1); + _log.GetTermAtIndex(1).Result.ShouldBe(1); follower.CurrentState.LeaderId.ShouldBe(appendEntriesRpc.LeaderId); } @@ -133,7 +133,7 @@ public async Task FollowerShouldSetCommitIndexIfLeaderCommitGreaterThanCommitInd { _currentState = new CurrentState(Guid.NewGuid().ToString(), 1, default(string), 0, 0, default(string)); var log = new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1); - _log.Apply(log); + await _log.Apply(log); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(log) .WithTerm(1) @@ -153,7 +153,7 @@ public async Task CandidateShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIn _currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); //assume log applied by node? var log = new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1); - _log.Apply(log); + await _log.Apply(log); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(log) .WithTerm(1) @@ -174,7 +174,7 @@ public async Task LeaderShouldSetCommitIndexIfLeaderCommitGreaterThanCommitIndex _currentState = new CurrentState(Guid.NewGuid().ToString(), 0, default(string), 0, 0, default(string)); //assume log applied by node? var log = new LogEntry(new FakeCommand("term 1 commit index 0"), typeof(string), 1); - _log.Apply(log); + await _log.Apply(log); var appendEntriesRpc = new AppendEntriesBuilder() .WithEntry(log) .WithTerm(1) diff --git a/test/Rafty.UnitTests/InMemoryLogTests.cs b/test/Rafty.UnitTests/InMemoryLogTests.cs index 1c1f01d..f4f5e4f 100644 --- a/test/Rafty.UnitTests/InMemoryLogTests.cs +++ b/test/Rafty.UnitTests/InMemoryLogTests.cs @@ -1,5 +1,6 @@ namespace Rafty.UnitTests { + using System.Threading.Tasks; using Log; using Shouldly; using Xunit; @@ -10,81 +11,81 @@ public class InMemoryLogTests public void ShouldInitialiseCorrectly() { var log = new InMemoryLog(); - log.LastLogIndex.ShouldBe(1); - log.LastLogTerm.ShouldBe(0); + log.LastLogIndex().Result.ShouldBe(1); + log.LastLogTerm().Result.ShouldBe(0); } [Fact] - public void ShouldApplyLog() + public async Task ShouldApplyLog() { var log = new InMemoryLog(); - var index = log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + var index = await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); index.ShouldBe(1); } [Fact] - public void ShouldSetLastLogIndex() + public async Task ShouldSetLastLogIndex() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.LastLogIndex.ShouldBe(1); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + log.LastLogIndex().Result.ShouldBe(1); } [Fact] - public void ShouldSetLastLogTerm() + public async Task ShouldSetLastLogTerm() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.LastLogTerm.ShouldBe(1); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + log.LastLogTerm().Result.ShouldBe(1); } [Fact] - public void ShouldGetTermAtIndex() + public async Task ShouldGetTermAtIndex() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.GetTermAtIndex(1).ShouldBe(1); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + log.GetTermAtIndex(1).Result.ShouldBe(1); } [Fact] - public void ShouldDeleteConflict() + public async Task ShouldDeleteConflict() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); log.ExposedForTesting.Count.ShouldBe(0); } [Fact] - public void ShouldNotDeleteConflict() + public async Task ShouldNotDeleteConflict() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 1)); log.ExposedForTesting.Count.ShouldBe(1); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogs() + public async Task ShouldDeleteConflictAndSubsequentLogs() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(1, new LogEntry(new FakeCommand("test"), typeof(string), 2)); log.ExposedForTesting.Count.ShouldBe(0); } [Fact] - public void ShouldDeleteConflictAndSubsequentLogsFromMidPoint() + public async Task ShouldDeleteConflictAndSubsequentLogsFromMidPoint() { var log = new InMemoryLog(); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.DeleteConflictsFromThisLog(4, new LogEntry(new FakeCommand("test"), typeof(string), 2)); log.ExposedForTesting.Count.ShouldBe(3); log.ExposedForTesting[1].Term.ShouldBe(1); log.ExposedForTesting[2].Term.ShouldBe(1); @@ -92,12 +93,12 @@ public void ShouldDeleteConflictAndSubsequentLogsFromMidPoint() } [Fact] - public void ShouldRemoveFromLog() + public async Task ShouldRemoveFromLog() { var log = new InMemoryLog(); - var index = log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); - log.Remove(index); - log.Count.ShouldBe(0); + var index = await log.Apply(new LogEntry(new FakeCommand("test"), typeof(string), 1)); + await log.Remove(index); + log.Count().Result.ShouldBe(0); } } } \ No newline at end of file diff --git a/test/Rafty.UnitTests/LeaderTests.cs b/test/Rafty.UnitTests/LeaderTests.cs index 9584da5..fa4b173 100644 --- a/test/Rafty.UnitTests/LeaderTests.cs +++ b/test/Rafty.UnitTests/LeaderTests.cs @@ -198,7 +198,7 @@ bool TestPeerStates() } [Fact] - public void ShouldSendAppendEntriesStartingAtNextIndex() + public async Task ShouldSendAppendEntriesStartingAtNextIndex() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -208,19 +208,19 @@ public void ShouldSendAppendEntriesStartingAtNextIndex() //add 3 logs var logOne = new LogEntry(new FakeCommand("1"), typeof(string), 1); - _log.Apply(logOne); + await _log.Apply(logOne); var logTwo = new LogEntry(new FakeCommand("2"), typeof(string), 1); - _log.Apply(logTwo); + await _log.Apply(logTwo); var logThree = new LogEntry(new FakeCommand("3"), typeof(string), 1); - _log.Apply(logThree); + await _log.Apply(logThree); _currentState = new CurrentState(_id, 1, default(string), 2, 2, default(string)); var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); - var logs = _log.GetFrom(1); + var logs = await _log.GetFrom(1); logs.Count.ShouldBe(3); } [Fact] - public void ShouldUpdateMatchIndexAndNextIndexIfSuccessful() + public async Task ShouldUpdateMatchIndexAndNextIndexIfSuccessful() { _peers = new List(); for (var i = 0; i < 4; i++) @@ -230,11 +230,11 @@ public void ShouldUpdateMatchIndexAndNextIndexIfSuccessful() //add 3 logs _currentState = new CurrentState(_id, 1, default(string), 2, 2, default(string)); var logOne = new LogEntry(new FakeCommand("1"), typeof(string), 1); - _log.Apply(logOne); + await _log.Apply(logOne); var logTwo = new LogEntry(new FakeCommand("2"), typeof(string), 1); - _log.Apply(logTwo); + await _log.Apply(logTwo); var logThree = new LogEntry(new FakeCommand("3"), typeof(string), 1); - _log.Apply(logThree); + await _log.Apply(logThree); var leader = new Leader(_currentState, _fsm, (s) => _peers, _log, _node, _settings, _rules); bool FirstTest(List peerState) @@ -595,7 +595,7 @@ bool TestPeerStates(List peerState) return passed == peerState.Count * 2; } var result = WaitFor(1000).Until(() => TestPeerStates(leader.PeerStates)); - _log.Count.ShouldBe(0); + _log.Count().Result.ShouldBe(0); result.ShouldBeTrue(); } @@ -637,7 +637,7 @@ bool TestPeerStates(List peerState) return passed == peerState.Count * 2; } var result = WaitFor(1000).Until(() => TestPeerStates(leader.PeerStates)); - _log.Count.ShouldBe(0); + _log.Count().Result.ShouldBe(0); result.ShouldBeTrue(); } }