From 24454bb630eb50e4231b6921d9919c5e495dae36 Mon Sep 17 00:00:00 2001 From: "tom.pallister" Date: Wed, 5 Apr 2017 17:01:39 +0100 Subject: [PATCH] exposed async await on rafty mainly for state maching with ocelot --- .../RaftyConfigurationExtensions.cs | 4 ++-- src/Rafty/Messaging/HttpClientMessageSender.cs | 2 +- src/Rafty/Raft/Server.cs | 16 ++++++++-------- src/Rafty/State/FakeStateMachine.cs | 4 +++- src/Rafty/State/IStateMachine.cs | 4 +++- test/Rafty.UnitTests/AllServersTests.cs | 2 +- test/Rafty.UnitTests/AppendEntriesTests.cs | 6 +++--- test/Rafty.UnitTests/CandidateTests.cs | 2 +- test/Rafty.UnitTests/FollowerTests.cs | 2 +- test/Rafty.UnitTests/LeaderTests.cs | 4 ++-- test/Rafty.UnitTests/RequestVoteTests.cs | 4 ++-- 11 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/Rafty/Infrastructure/RaftyConfigurationExtensions.cs b/src/Rafty/Infrastructure/RaftyConfigurationExtensions.cs index e105277..f621d92 100644 --- a/src/Rafty/Infrastructure/RaftyConfigurationExtensions.cs +++ b/src/Rafty/Infrastructure/RaftyConfigurationExtensions.cs @@ -66,7 +66,7 @@ public static (IApplicationBuilder builder, Server server, ServerInCluster serve { TypeNameHandling = TypeNameHandling.All }); - var appendEntriesResponse = server.Receive(appendEntries); + var appendEntriesResponse = await server.Receive(appendEntries); await context.Response.WriteAsync(JsonConvert.SerializeObject(appendEntriesResponse)); } catch (Exception exception) @@ -110,7 +110,7 @@ public static (IApplicationBuilder builder, Server server, ServerInCluster serve { TypeNameHandling = TypeNameHandling.All }); - var sendCommandToLeaderResponse = server.Receive(command); + var sendCommandToLeaderResponse = await server.Receive(command); await context.Response.WriteAsync(JsonConvert.SerializeObject(sendCommandToLeaderResponse)); } catch (Exception exception) diff --git a/src/Rafty/Messaging/HttpClientMessageSender.cs b/src/Rafty/Messaging/HttpClientMessageSender.cs index f5bd5e5..9bed504 100644 --- a/src/Rafty/Messaging/HttpClientMessageSender.cs +++ b/src/Rafty/Messaging/HttpClientMessageSender.cs @@ -41,7 +41,7 @@ public HttpClientMessageSender(IServiceRegistry serviceRegistry, ILogger logger, { {typeof(BecomeCandidate), x => _server.Receive((BecomeCandidate) x)}, {typeof(SendHeartbeat), x => _server.Receive((SendHeartbeat) x)}, - {typeof(Command), x => _server.Receive((Command) x)}, + {typeof(Command), async x => await _server.Receive((Command) x)}, }; } diff --git a/src/Rafty/Raft/Server.cs b/src/Rafty/Raft/Server.cs index 2a19529..4326814 100644 --- a/src/Rafty/Raft/Server.cs +++ b/src/Rafty/Raft/Server.cs @@ -128,7 +128,7 @@ public void Receive(BecomeCandidate becomeCandidate) SendElectionTimeoutMessage(10); } - public AppendEntriesResponse Receive(AppendEntries appendEntries) + public async Task Receive(AppendEntries appendEntries) { if (!_serversInClusterInCluster.Contains(appendEntries.LeaderId)) @@ -217,7 +217,7 @@ log[lastApplied] to state machine(ยง5.3)*/ { LastApplied++; } - _stateMachine.Apply(Log[LastApplied].Command); + await _stateMachine.Apply(Log[LastApplied].Command); return new AppendEntriesResponse(CurrentTerm, true, Id, appendEntries.LeaderId); } @@ -254,11 +254,11 @@ public void Receive(SendHeartbeat sendHeartbeat) } } - public SendLeaderCommandResponse Receive(ICommand command) + public async Task Receive(ICommand command) { if (State is Follower) { - _messageBus.Send(command, LeaderId); + await _messageBus.Send(command, LeaderId); } if (State is Leader) @@ -306,7 +306,7 @@ public SendLeaderCommandResponse Receive(ICommand command) { _logger.LogDebug($"Processing Append entries counter: {counter}"); _logger.LogDebug($"Processing Append entries result was: {task.Result.Success} counter: {counter}"); - Receive(task.Result); + await Receive(task.Result); _logger.LogDebug($"Processed Append entries counter: {counter}"); } } @@ -391,7 +391,7 @@ private void Receive(RequestVoteResponse requestVoteResponse) } } - private void Receive(AppendEntriesResponse appendEntriesResponse) + private async Task Receive(AppendEntriesResponse appendEntriesResponse) { if (State is Leader) { @@ -428,7 +428,7 @@ private void Receive(AppendEntriesResponse appendEntriesResponse) { var entry = Log[Log.Count - 1]; LastApplied = Log.Count - 1; - _stateMachine.Apply(entry.Command); + await _stateMachine.Apply(entry.Command); } CurrentTermAppendEntriesResponse = 0; @@ -451,7 +451,7 @@ private void Receive(AppendEntriesResponse appendEntriesResponse) Task.WaitAll(task); - Receive(task.Result); + await Receive(task.Result); } } } diff --git a/src/Rafty/State/FakeStateMachine.cs b/src/Rafty/State/FakeStateMachine.cs index 687a4b5..07a41b7 100644 --- a/src/Rafty/State/FakeStateMachine.cs +++ b/src/Rafty/State/FakeStateMachine.cs @@ -3,6 +3,8 @@ namespace Rafty.State { + using System.Threading.Tasks; + public class FakeStateMachine : IStateMachine { public List Commands; @@ -12,7 +14,7 @@ public FakeStateMachine() Commands = new List(); } - public void Apply(ICommand command) + public async Task Apply(ICommand command) { Commands.Add(command); } diff --git a/src/Rafty/State/IStateMachine.cs b/src/Rafty/State/IStateMachine.cs index 33d108a..3a59144 100644 --- a/src/Rafty/State/IStateMachine.cs +++ b/src/Rafty/State/IStateMachine.cs @@ -2,8 +2,10 @@ namespace Rafty.State { + using System.Threading.Tasks; + public interface IStateMachine { - void Apply(ICommand command); + Task Apply(ICommand command); } } \ No newline at end of file diff --git a/test/Rafty.UnitTests/AllServersTests.cs b/test/Rafty.UnitTests/AllServersTests.cs index 96a3554..6eb3dd9 100644 --- a/test/Rafty.UnitTests/AllServersTests.cs +++ b/test/Rafty.UnitTests/AllServersTests.cs @@ -170,7 +170,7 @@ private void ThenTheCurrentTermIs(int expected) private void ServerReceives(AppendEntries appendEntries) { - _server.Receive(appendEntries); + _server.Receive(appendEntries).Wait(); } private void ServerReceives(SendHeartbeat sendHeartbeat) diff --git a/test/Rafty.UnitTests/AppendEntriesTests.cs b/test/Rafty.UnitTests/AppendEntriesTests.cs index 33ee8db..8d7522f 100644 --- a/test/Rafty.UnitTests/AppendEntriesTests.cs +++ b/test/Rafty.UnitTests/AppendEntriesTests.cs @@ -182,12 +182,12 @@ private void ThenTheLogCountIs(int expected) private void GivenTheServerRecieves(AppendEntries appendEntries) { - _server.Receive(appendEntries); + _server.Receive(appendEntries).Wait(); } private void GivenTheCurrentTermIs(int term) { - _server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid())); + _server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid())).Wait(); } private void ThenTheCurrentTermIs(int expected) @@ -203,7 +203,7 @@ private void ThenTheReplyIs(AppendEntriesResponse expected) private void ServerReceives(AppendEntries appendEntries) { - _result = _server.Receive(appendEntries); + _result = _server.Receive(appendEntries).Result; } private void GivenANewServer() diff --git a/test/Rafty.UnitTests/CandidateTests.cs b/test/Rafty.UnitTests/CandidateTests.cs index 560c721..9f03c83 100644 --- a/test/Rafty.UnitTests/CandidateTests.cs +++ b/test/Rafty.UnitTests/CandidateTests.cs @@ -232,7 +232,7 @@ private void ServerReceives(BecomeCandidate becomeCandidate) private void ServerReceives(AppendEntries appendEntries) { - _server.Receive(appendEntries); + _server.Receive(appendEntries).Wait(); } private void TheServerIsACandidate() diff --git a/test/Rafty.UnitTests/FollowerTests.cs b/test/Rafty.UnitTests/FollowerTests.cs index f70515b..c547e7c 100644 --- a/test/Rafty.UnitTests/FollowerTests.cs +++ b/test/Rafty.UnitTests/FollowerTests.cs @@ -62,7 +62,7 @@ private void ThenTheCommandIsForwardedToTheLeader() private void ServerReceives(FakeCommand fakeCommand) { - _server.Receive(fakeCommand); + _server.Receive(fakeCommand).Wait(); } private void ThenTheServerReceivesBecomeCandidate() diff --git a/test/Rafty.UnitTests/LeaderTests.cs b/test/Rafty.UnitTests/LeaderTests.cs index 003c4d9..7dfa104 100644 --- a/test/Rafty.UnitTests/LeaderTests.cs +++ b/test/Rafty.UnitTests/LeaderTests.cs @@ -350,7 +350,7 @@ private void ThenTheServerIsAFollower() private void ServerReceives(AppendEntries appendEntries) { - _server.Receive(appendEntries); + _server.Receive(appendEntries).Wait(); } private void ThenTheServerSendAppendEntriesToEachRemoteServer() @@ -362,7 +362,7 @@ private void ThenTheServerSendAppendEntriesToEachRemoteServer() private void WhenTheServerReceivesACommand(FakeCommand fakeCommand) { _fakeCommand = fakeCommand; - _server.Receive(fakeCommand); + _server.Receive(fakeCommand).Wait(); } private void TheServerWillSendAnotherHeartbeatLater() diff --git a/test/Rafty.UnitTests/RequestVoteTests.cs b/test/Rafty.UnitTests/RequestVoteTests.cs index 736ff5f..9544fec 100644 --- a/test/Rafty.UnitTests/RequestVoteTests.cs +++ b/test/Rafty.UnitTests/RequestVoteTests.cs @@ -100,7 +100,7 @@ private void GivenTheCandidatesLogIsAtIndex(int index) var appendEntries = new AppendEntries(1, Guid.NewGuid(), index, 0, entries, 0, Guid.NewGuid()); - _server.Receive(appendEntries); + _server.Receive(appendEntries).Wait(); } private void ThenTheReplyIs(RequestVoteResponse expected) @@ -128,7 +128,7 @@ private void GivenANewServer() private void GivenTheCurrentTermIs(int term) { - _server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid())); + _server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid())).Wait(); } } }