Skip to content

Commit

Permalink
exposed async await on rafty mainly for state maching with ocelot
Browse files Browse the repository at this point in the history
  • Loading branch information
tom.pallister committed Apr 5, 2017
1 parent 94f861d commit 24454bb
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/Rafty/Infrastructure/RaftyConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static (IApplicationBuilder builder, Server server, ServerInCluster serve
{
TypeNameHandling = TypeNameHandling.All
});
var appendEntriesResponse = server.Receive(appendEntries);
var appendEntriesResponse = await server.Receive(appendEntries);
await context.Response.WriteAsync(JsonConvert.SerializeObject(appendEntriesResponse));
}
catch (Exception exception)
Expand Down Expand Up @@ -110,7 +110,7 @@ public static (IApplicationBuilder builder, Server server, ServerInCluster serve
{
TypeNameHandling = TypeNameHandling.All
});
var sendCommandToLeaderResponse = server.Receive(command);
var sendCommandToLeaderResponse = await server.Receive(command);
await context.Response.WriteAsync(JsonConvert.SerializeObject(sendCommandToLeaderResponse));
}
catch (Exception exception)
Expand Down
2 changes: 1 addition & 1 deletion src/Rafty/Messaging/HttpClientMessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public HttpClientMessageSender(IServiceRegistry serviceRegistry, ILogger logger,
{
{typeof(BecomeCandidate), x => _server.Receive((BecomeCandidate) x)},
{typeof(SendHeartbeat), x => _server.Receive((SendHeartbeat) x)},
{typeof(Command), x => _server.Receive((Command) x)},
{typeof(Command), async x => await _server.Receive((Command) x)},
};
}

Expand Down
16 changes: 8 additions & 8 deletions src/Rafty/Raft/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void Receive(BecomeCandidate becomeCandidate)
SendElectionTimeoutMessage(10);
}

public AppendEntriesResponse Receive(AppendEntries appendEntries)
public async Task<AppendEntriesResponse> Receive(AppendEntries appendEntries)
{

if (!_serversInClusterInCluster.Contains(appendEntries.LeaderId))
Expand Down Expand Up @@ -217,7 +217,7 @@ log[lastApplied] to state machine(§5.3)*/
{
LastApplied++;
}
_stateMachine.Apply(Log[LastApplied].Command);
await _stateMachine.Apply(Log[LastApplied].Command);
return new AppendEntriesResponse(CurrentTerm, true, Id, appendEntries.LeaderId);

}
Expand Down Expand Up @@ -254,11 +254,11 @@ public void Receive(SendHeartbeat sendHeartbeat)
}
}

public SendLeaderCommandResponse Receive(ICommand command)
public async Task<SendLeaderCommandResponse> Receive(ICommand command)
{
if (State is Follower)
{
_messageBus.Send(command, LeaderId);
await _messageBus.Send(command, LeaderId);
}

if (State is Leader)
Expand Down Expand Up @@ -306,7 +306,7 @@ public SendLeaderCommandResponse Receive(ICommand command)
{
_logger.LogDebug($"Processing Append entries counter: {counter}");
_logger.LogDebug($"Processing Append entries result was: {task.Result.Success} counter: {counter}");
Receive(task.Result);
await Receive(task.Result);
_logger.LogDebug($"Processed Append entries counter: {counter}");
}
}
Expand Down Expand Up @@ -391,7 +391,7 @@ private void Receive(RequestVoteResponse requestVoteResponse)
}
}

private void Receive(AppendEntriesResponse appendEntriesResponse)
private async Task Receive(AppendEntriesResponse appendEntriesResponse)
{
if (State is Leader)
{
Expand Down Expand Up @@ -428,7 +428,7 @@ private void Receive(AppendEntriesResponse appendEntriesResponse)
{
var entry = Log[Log.Count - 1];
LastApplied = Log.Count - 1;
_stateMachine.Apply(entry.Command);
await _stateMachine.Apply(entry.Command);
}

CurrentTermAppendEntriesResponse = 0;
Expand All @@ -451,7 +451,7 @@ private void Receive(AppendEntriesResponse appendEntriesResponse)

Task.WaitAll(task);

Receive(task.Result);
await Receive(task.Result);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/Rafty/State/FakeStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

namespace Rafty.State
{
using System.Threading.Tasks;

public class FakeStateMachine : IStateMachine
{
public List<ICommand> Commands;
Expand All @@ -12,7 +14,7 @@ public FakeStateMachine()
Commands = new List<ICommand>();
}

public void Apply(ICommand command)
public async Task Apply(ICommand command)
{
Commands.Add(command);
}
Expand Down
4 changes: 3 additions & 1 deletion src/Rafty/State/IStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

namespace Rafty.State
{
using System.Threading.Tasks;

public interface IStateMachine
{
void Apply(ICommand command);
Task Apply(ICommand command);
}
}
2 changes: 1 addition & 1 deletion test/Rafty.UnitTests/AllServersTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private void ThenTheCurrentTermIs(int expected)

private void ServerReceives(AppendEntries appendEntries)
{
_server.Receive(appendEntries);
_server.Receive(appendEntries).Wait();
}

private void ServerReceives(SendHeartbeat sendHeartbeat)
Expand Down
6 changes: 3 additions & 3 deletions test/Rafty.UnitTests/AppendEntriesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ private void ThenTheLogCountIs(int expected)

private void GivenTheServerRecieves(AppendEntries appendEntries)
{
_server.Receive(appendEntries);
_server.Receive(appendEntries).Wait();
}

private void GivenTheCurrentTermIs(int term)
{
_server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid()));
_server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid())).Wait();
}

private void ThenTheCurrentTermIs(int expected)
Expand All @@ -203,7 +203,7 @@ private void ThenTheReplyIs(AppendEntriesResponse expected)

private void ServerReceives(AppendEntries appendEntries)
{
_result = _server.Receive(appendEntries);
_result = _server.Receive(appendEntries).Result;
}

private void GivenANewServer()
Expand Down
2 changes: 1 addition & 1 deletion test/Rafty.UnitTests/CandidateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private void ServerReceives(BecomeCandidate becomeCandidate)

private void ServerReceives(AppendEntries appendEntries)
{
_server.Receive(appendEntries);
_server.Receive(appendEntries).Wait();
}

private void TheServerIsACandidate()
Expand Down
2 changes: 1 addition & 1 deletion test/Rafty.UnitTests/FollowerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private void ThenTheCommandIsForwardedToTheLeader()

private void ServerReceives(FakeCommand fakeCommand)
{
_server.Receive(fakeCommand);
_server.Receive(fakeCommand).Wait();
}

private void ThenTheServerReceivesBecomeCandidate()
Expand Down
4 changes: 2 additions & 2 deletions test/Rafty.UnitTests/LeaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private void ThenTheServerIsAFollower()

private void ServerReceives(AppendEntries appendEntries)
{
_server.Receive(appendEntries);
_server.Receive(appendEntries).Wait();
}

private void ThenTheServerSendAppendEntriesToEachRemoteServer()
Expand All @@ -362,7 +362,7 @@ private void ThenTheServerSendAppendEntriesToEachRemoteServer()
private void WhenTheServerReceivesACommand(FakeCommand fakeCommand)
{
_fakeCommand = fakeCommand;
_server.Receive(fakeCommand);
_server.Receive(fakeCommand).Wait();
}

private void TheServerWillSendAnotherHeartbeatLater()
Expand Down
4 changes: 2 additions & 2 deletions test/Rafty.UnitTests/RequestVoteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void GivenTheCandidatesLogIsAtIndex(int index)

var appendEntries = new AppendEntries(1, Guid.NewGuid(), index, 0, entries, 0, Guid.NewGuid());

_server.Receive(appendEntries);
_server.Receive(appendEntries).Wait();
}

private void ThenTheReplyIs(RequestVoteResponse expected)
Expand Down Expand Up @@ -128,7 +128,7 @@ private void GivenANewServer()

private void GivenTheCurrentTermIs(int term)
{
_server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid()));
_server.Receive(new AppendEntries(term, Guid.NewGuid(), 0, 0, null, 0, Guid.NewGuid())).Wait();
}
}
}

0 comments on commit 24454bb

Please sign in to comment.