Skip to content

Commit

Permalink
Merge pull request ThreeMammals#11 from ThreeMammals/feature/writing-…
Browse files Browse the repository at this point in the history
…same-log-multiple-times

test fails intermitently because log written to more than once..work …
  • Loading branch information
TomPallister authored May 29, 2018
2 parents 585aad4 + 28f234f commit 98e7d50
Show file tree
Hide file tree
Showing 61 changed files with 1,295 additions and 470 deletions.
2 changes: 1 addition & 1 deletion Rafty.sln
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
version.ps1 = version.ps1
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rafty.IntegrationTests", "test\Rafty.IntegrationTests\Rafty.IntegrationTests.csproj", "{BBB04228-BC0E-4711-9C80-769C5FAD777A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Rafty.IntegrationTests", "test\Rafty.IntegrationTests\Rafty.IntegrationTests.csproj", "{BBB04228-BC0E-4711-9C80-769C5FAD777A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
3 changes: 2 additions & 1 deletion src/Rafty/Concensus/Messages/AppendEntries.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
using System;
using System.Collections.Generic;
using Infrastructure;
using Log;

public sealed class AppendEntries : Message
Expand Down
3 changes: 1 addition & 2 deletions src/Rafty/Concensus/Messages/AppendEntriesBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
using System;
using System.Collections.Generic;
using Log;

Expand Down
2 changes: 1 addition & 1 deletion src/Rafty/Concensus/Messages/AppendEntriesResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
public sealed class AppendEntriesResponse
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
public class AppendEntriesResponseBuilder
{
Expand Down
4 changes: 1 addition & 3 deletions src/Rafty/Concensus/Messages/RequestVote.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;

namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
public sealed class RequestVote
{
Expand Down
4 changes: 1 addition & 3 deletions src/Rafty/Concensus/Messages/RequestVoteBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;

namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
public class RequestVoteBuilder
{
Expand Down
2 changes: 1 addition & 1 deletion src/Rafty/Concensus/Messages/RequestVoteResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
public sealed class RequestVoteResponse
{
Expand Down
2 changes: 1 addition & 1 deletion src/Rafty/Concensus/Messages/RequestVoteResponseBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Rafty.Concensus
namespace Rafty.Concensus.Messages
{
public class RequestVoteResponseBuilder
{
Expand Down
11 changes: 6 additions & 5 deletions src/Rafty/Concensus/Node/INode.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using Rafty.FiniteStateMachine;

namespace Rafty.Concensus
namespace Rafty.Concensus.Node
{
using System.Threading.Tasks;
using FiniteStateMachine;
using Infrastructure;
using Messages;
using States;

public interface INode
{
Expand All @@ -13,7 +14,7 @@ public interface INode
void BecomeCandidate(CurrentState state);
Task<AppendEntriesResponse> Handle(AppendEntries appendEntries);
Task<RequestVoteResponse> Handle(RequestVote requestVote);
void Start(string id);
void Start(NodeId id);
void Stop();
Task<Response<T>> Accept<T>(T command) where T : ICommand;
}
Expand Down
108 changes: 78 additions & 30 deletions src/Rafty/Concensus/Node/Node.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Rafty.Concensus.States;
using Rafty.FiniteStateMachine;
using Rafty.Infrastructure;
using Rafty.Log;

namespace Rafty.Concensus
namespace Rafty.Concensus.Node
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FiniteStateMachine;
using Infrastructure;
using Log;
using Messages;
using Microsoft.Extensions.Logging;
using Peers;
using States;

public class Node : INode
{
Expand All @@ -19,15 +22,21 @@ public class Node : INode
private readonly ISettings _settings;
private IRules _rules;
private IPeersProvider _peersProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<Node> _logger;
private readonly SemaphoreSlim _appendingEntries = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _requestVote = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _acceptCommand = new SemaphoreSlim(1, 1);

public Node(
IFiniteStateMachine fsm,
ILog log,
IFiniteStateMachine fsm,
ILog log,
ISettings settings,
IPeersProvider peersProvider)
IPeersProvider peersProvider,
ILoggerFactory loggerFactory)
{
//dont think rules should be injected at the moment..EEK UNCLE BOB
_rules = new Rules();
_loggerFactory = loggerFactory;
_logger = _loggerFactory.CreateLogger<Node>();
_fsm = fsm;
_log = log;
_random = new RandomDelay();
Expand All @@ -42,54 +51,93 @@ public Node(

public IState State { get; private set; }

public void Start(string id)
public void Start(NodeId id)
{
if(State?.CurrentState == null)
{
BecomeFollower(new CurrentState(id, 0, default(string), 0, 0, default(string)));
}
else
{
BecomeFollower(State.CurrentState);
}
_rules = new Rules(_loggerFactory, id);

BecomeFollower(State?.CurrentState ?? new CurrentState(id.Id, 0, default(string), 0, 0, default(string)));
}

public void BecomeCandidate(CurrentState state)
{
State.Stop();
var candidate = new Candidate(state, _fsm, _getPeers(state), _log, _random, this, _settings, _rules);
State?.Stop();
_logger.LogInformation($"{state.Id} became candidate");
var candidate = new Candidate(state, _fsm, _getPeers(state), _log, _random, this, _settings, _rules, _loggerFactory);
State = candidate;
candidate.BeginElection();
}

public void BecomeLeader(CurrentState state)
{
State.Stop();
State = new Leader(state, _fsm, _getPeers, _log, this, _settings, _rules);
_logger.LogInformation($"{state.Id} became leader");
State = new Leader(state, _fsm, _getPeers, _log, this, _settings, _rules, _loggerFactory);
}

public void BecomeFollower(CurrentState state)
{
State?.Stop();
State = new Follower(state, _fsm, _log, _random, this, _settings, _rules, _getPeers(state));
_logger.LogInformation($"{state.Id} became follower");
State = new Follower(state, _fsm, _log, _random, this, _settings, _rules, _getPeers(state), _loggerFactory);
}

public async Task<AppendEntriesResponse> Handle(AppendEntries appendEntries)
{
return await State.Handle(appendEntries);
try
{
await _appendingEntries.WaitAsync();

var response = await State.Handle(appendEntries);

if (appendEntries.Entries.Any())
{
_logger.LogInformation($"{State.GetType().Name} id: {State.CurrentState.Id} responded to appendentries with success: {response.Success} and term: {response.Term}");
}

return response;
}
finally
{
_appendingEntries.Release();
}
}

public async Task<RequestVoteResponse> Handle(RequestVote requestVote)
{
return await State.Handle(requestVote);
try
{
await _requestVote.WaitAsync();

return await State.Handle(requestVote);
}
finally
{
_requestVote.Release();
}
}

public async Task<Response<T>> Accept<T>(T command) where T : ICommand
{
return await State.Accept(command);
try
{
await _acceptCommand.WaitAsync();

return await State.Accept(command);
}
finally
{
_acceptCommand.Release();

}
}

public void Stop()
{
State.Stop();
State = null;
}

public void Pause()
{
State.Stop();
}
Expand Down
12 changes: 6 additions & 6 deletions src/Rafty/Concensus/Node/NodePeer.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System;
using System.Net.Http;
using Rafty.Concensus;
using Rafty.FiniteStateMachine;

namespace Rafty.AcceptanceTests
namespace Rafty.Concensus.Node
{
using System;
using System.Threading.Tasks;
using FiniteStateMachine;
using Infrastructure;
using Messages;
using Peers;

public class NodePeer : IPeer
{
Expand Down
8 changes: 4 additions & 4 deletions src/Rafty/Concensus/Peers/IPeer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using Rafty.FiniteStateMachine;

namespace Rafty.Concensus
namespace Rafty.Concensus.Peers
{
using System.Threading.Tasks;
using FiniteStateMachine;
using Infrastructure;
using Messages;

public interface IPeer
{
Expand Down
7 changes: 4 additions & 3 deletions src/Rafty/Concensus/Peers/Peer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using Rafty.FiniteStateMachine;

namespace Rafty.Concensus
namespace Rafty.Concensus.Peers
{
using System;
using System.Threading.Tasks;
using FiniteStateMachine;
using Infrastructure;
using Messages;

public class Peer : IPeer
{
Expand Down
Loading

0 comments on commit 98e7d50

Please sign in to comment.