Skip to content

Commit

Permalink
Merge pull request ThreeMammals#10 from ThreeMammals/feature/async-await
Browse files Browse the repository at this point in the history
Feature/async await
  • Loading branch information
TomPallister authored May 3, 2018
2 parents a274e43 + 1ef1bcf commit fedbf05
Show file tree
Hide file tree
Showing 34 changed files with 643 additions and 604 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ You must implement ISettings which provides a description of each member in the
Rafty provides some in memory implementations of its interfaces (you shouldn't use these for anything serious).

```csharp

var log = new InMemoryLog();
var fsm = new InMemoryStateMachine();
var settings = new InMemorySettings(1000, 3500, 50, 5000);
var peersProvider = new InMemoryPeersProvider(_peers);
var node = new Node(fsm, log, settings, peersProvider);
node.Start();

```

The above code will get a Rafty node up and running. If the IPeersProvider does not return any IPeers then it will elect itself leader and just run along happily. If something joins the cluster later it will update that new node as the node will get a heartbeat before it can elect itself. Or an election will start!
Expand All @@ -54,12 +56,14 @@ So in order to get Rafty really running the IPeerProvider needs to return peers.
Finally you need to expose the INode interface to some kind of HTTP. I would advise just a plain old .net core web api type thing. These are the methods you need to expose and the transport in your IPeer should hit these URLS (hope that makes some sense). You can look at NodePeer to see how I do this in memory.

```csharp
AppendEntriesResponse Request(AppendEntries appendEntries);
RequestVoteResponse Request(RequestVote requestVote);
Response<T> Request<T>(T command);

Task<AppendEntriesResponse> Request(AppendEntries appendEntries);
Task<RequestVoteResponse> Request(RequestVote requestVote);
Task<Response<T>> Request<T>(T command);

```

## Further help..
## Further help

The Acceptance and Integration tests will be helpful for anyone who wants to use Rafty.

Expand Down
8 changes: 5 additions & 3 deletions src/Rafty/Concensus/Node/INode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

namespace Rafty.Concensus
{
using System.Threading.Tasks;

public interface INode
{
IState State { get; }
void BecomeLeader(CurrentState state);
void BecomeFollower(CurrentState state);
void BecomeCandidate(CurrentState state);
AppendEntriesResponse Handle(AppendEntries appendEntries);
RequestVoteResponse Handle(RequestVote requestVote);
Task<AppendEntriesResponse> Handle(AppendEntries appendEntries);
Task<RequestVoteResponse> Handle(RequestVote requestVote);
void Start(string id);
void Stop();
Response<T> Accept<T>(T command) where T : ICommand;
Task<Response<T>> Accept<T>(T command) where T : ICommand;
}
}
15 changes: 9 additions & 6 deletions src/Rafty/Concensus/Node/Node.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

namespace Rafty.Concensus
{
using System.Threading.Tasks;

public class Node : INode
{
private readonly IFiniteStateMachine _fsm;
Expand Down Expand Up @@ -72,20 +74,21 @@ public void BecomeFollower(CurrentState state)
State = new Follower(state, _fsm, _log, _random, this, _settings, _rules, _getPeers(state));
}

public AppendEntriesResponse Handle(AppendEntries appendEntries)
public async Task<AppendEntriesResponse> Handle(AppendEntries appendEntries)
{
return State.Handle(appendEntries);
return await State.Handle(appendEntries);
}

public RequestVoteResponse Handle(RequestVote requestVote)
public async Task<RequestVoteResponse> Handle(RequestVote requestVote)
{
return State.Handle(requestVote);
return await State.Handle(requestVote);
}

public Response<T> Accept<T>(T command) where T : ICommand
public async Task<Response<T>> Accept<T>(T command) where T : ICommand
{
return State.Accept(command);
return await State.Accept(command);
}

public void Stop()
{
State.Stop();
Expand Down
27 changes: 9 additions & 18 deletions src/Rafty/Concensus/Node/NodePeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,48 @@

namespace Rafty.AcceptanceTests
{
using System.Threading.Tasks;

public class NodePeer : IPeer
{
private Node _node;

public string Id
{
get
{
if(_node?.State?.CurrentState?.Id != null)
{
return _node.State.CurrentState.Id;
}

return default(string);
}
}
public string Id => _node?.State?.CurrentState?.Id;

public void SetNode (Node node)
{
_node = node;
}

public RequestVoteResponse Request(RequestVote requestVote)
public async Task<RequestVoteResponse> Request(RequestVote requestVote)
{
try
{
return _node.Handle(requestVote);
return await _node.Handle(requestVote);
}
catch(Exception e)
{
return new RequestVoteResponse(false, 0);
}
}

public AppendEntriesResponse Request(AppendEntries appendEntries)
public async Task<AppendEntriesResponse> Request(AppendEntries appendEntries)
{
try
{
return _node.Handle(appendEntries);
return await _node.Handle(appendEntries);
}
catch(Exception e)
{
return new AppendEntriesResponse(0, false);
}
}

public Response<T> Request<T>(T command) where T : ICommand
public async Task<Response<T>> Request<T>(T command) where T : ICommand
{
try
{
return _node.Accept(command);
return await _node.Accept(command);
}
catch(Exception e)
{
Expand Down
8 changes: 5 additions & 3 deletions src/Rafty/Concensus/Peers/IPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

namespace Rafty.Concensus
{
using System.Threading.Tasks;

public interface IPeer
{
/// <summary>
Expand All @@ -13,16 +15,16 @@ public interface IPeer
/// <summary>
/// This will make a requestvote request to the given peer. You must implement the transport.
/// </summary>
RequestVoteResponse Request(RequestVote requestVote);
Task<RequestVoteResponse> Request(RequestVote requestVote);

/// <summary>
/// This will make a appendentries request to the given peer. You must implement the transport.
/// </summary>
AppendEntriesResponse Request(AppendEntries appendEntries);
Task<AppendEntriesResponse> Request(AppendEntries appendEntries);

/// <summary>
/// This will make a command request ot the given peer. You must implement the transport.
/// </summary>
Response<T> Request<T>(T command) where T : ICommand;
Task<Response<T>> Request<T>(T command) where T : ICommand;
}
}
7 changes: 4 additions & 3 deletions src/Rafty/Concensus/Peers/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@
namespace Rafty.Concensus
{
using System;
using System.Threading.Tasks;

public class Peer : IPeer
{
public string Id => throw new NotImplementedException();

public RequestVoteResponse Request(RequestVote requestVote)
public Task<RequestVoteResponse> Request(RequestVote requestVote)
{
throw new NotImplementedException();
}

public AppendEntriesResponse Request(AppendEntries appendEntries)
public Task<AppendEntriesResponse> Request(AppendEntries appendEntries)
{
throw new NotImplementedException();
}

public Response<T> Request<T>(T command) where T : ICommand
public Task<Response<T>> Request<T>(T command) where T : ICommand
{
throw new NotImplementedException();
}
Expand Down
32 changes: 16 additions & 16 deletions src/Rafty/Concensus/States/Candidate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void BeginElection()
BecomeFollower();
}

public AppendEntriesResponse Handle(AppendEntries appendEntries)
public async Task<AppendEntriesResponse> Handle(AppendEntries appendEntries)
{
var response = _rules.AppendEntriesTermIsLessThanCurrentTerm(appendEntries, CurrentState);

Expand All @@ -84,20 +84,20 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries)
return response.appendEntriesResponse;
}

response = _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState);
response = await _rules.LogDoesntContainEntryAtPreviousLogIndexWhoseTermMatchesPreviousLogTerm(appendEntries, _log, CurrentState);

if(response.shouldReturn)
{
return response.appendEntriesResponse;
}

_rules.DeleteAnyConflictsInLog(appendEntries, _log);
await _rules.DeleteAnyConflictsInLog(appendEntries, _log);

_rules.ApplyEntriesToLog(appendEntries, _log);
await _rules.ApplyEntriesToLog(appendEntries, _log);

var commitIndexAndLastApplied = _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);
var commitIndexAndLastApplied = await _rules.CommitIndexAndLastApplied(appendEntries, _log, CurrentState);

ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied);
await ApplyToStateMachine(commitIndexAndLastApplied.commitIndex, commitIndexAndLastApplied.lastApplied);

SetLeaderId(appendEntries);

Expand All @@ -106,7 +106,7 @@ public AppendEntriesResponse Handle(AppendEntries appendEntries)
return new AppendEntriesResponse(CurrentState.CurrentTerm, true);
}

public RequestVoteResponse Handle(RequestVote requestVote)
public async Task<RequestVoteResponse> Handle(RequestVote requestVote)
{
var response = RequestVoteTermIsGreaterThanCurrentTerm(requestVote);

Expand All @@ -129,7 +129,7 @@ public RequestVoteResponse Handle(RequestVote requestVote)
return response.requestVoteResponse;
}

response = LastLogIndexAndLastLogTermMatchesThis(requestVote);
response = await LastLogIndexAndLastLogTermMatchesThis(requestVote);

if(response.shouldReturn)
{
Expand All @@ -139,7 +139,7 @@ public RequestVoteResponse Handle(RequestVote requestVote)
return new RequestVoteResponse(false, CurrentState.CurrentTerm);
}

public Response<T> Accept<T>(T command) where T : ICommand
public async Task<Response<T>> Accept<T>(T command) where T : ICommand
{
return new ErrorResponse<T>("Please retry command later. Currently electing new a new leader.", command);
}
Expand Down Expand Up @@ -234,7 +234,7 @@ private void ShouldBecomeLeader()

private async Task RequestVote(IPeer peer, BlockingCollection<RequestVoteResponse> requestVoteResponses)
{
var requestVoteResponse = peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, _log.LastLogIndex, _log.LastLogTerm));
var requestVoteResponse = await peer.Request(new RequestVote(CurrentState.CurrentTerm, CurrentState.Id, await _log.LastLogIndex(), await _log.LastLogTerm()));
requestVoteResponses.Add(requestVoteResponse);
}

Expand Down Expand Up @@ -266,13 +266,13 @@ private void ResetElectionTimer()
}, null, Convert.ToInt32(timeout.TotalMilliseconds), Convert.ToInt32(timeout.TotalMilliseconds));
}

private void ApplyToStateMachine(int commitIndex, int lastApplied)
private async Task ApplyToStateMachine(int commitIndex, int lastApplied)
{
while (commitIndex > lastApplied)
{
lastApplied++;
var log = _log.Get(lastApplied);
_fsm.Handle(log);
var log = await _log.Get(lastApplied);
await _fsm.Handle(log);
}

CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm,
Expand Down Expand Up @@ -303,10 +303,10 @@ private void AppendEntriesTermIsGreaterThanCurrentTerm(AppendEntries appendEntri
return (null, false);
}

private (RequestVoteResponse requestVoteResponse, bool shouldReturn) LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote)
private async Task<(RequestVoteResponse requestVoteResponse, bool shouldReturn)> LastLogIndexAndLastLogTermMatchesThis(RequestVote requestVote)
{
if (requestVote.LastLogIndex == _log.LastLogIndex &&
requestVote.LastLogTerm == _log.LastLogTerm)
if (requestVote.LastLogIndex == await _log.LastLogIndex() &&
requestVote.LastLogTerm == await _log.LastLogTerm())
{
CurrentState = new CurrentState(CurrentState.Id, CurrentState.CurrentTerm, requestVote.CandidateId, CurrentState.CommitIndex, CurrentState.LastApplied, CurrentState.LeaderId);
BecomeFollower();
Expand Down
Loading

0 comments on commit fedbf05

Please sign in to comment.