Skip to content

Commit

Permalink
Merge pull request #2 from mizrael/transaction-support
Browse files Browse the repository at this point in the history
added Transaction support
  • Loading branch information
mizrael authored Jan 3, 2021
2 parents 279c085 + 87ea2cd commit 2e44cc8
Show file tree
Hide file tree
Showing 49 changed files with 175 additions and 144 deletions.
2 changes: 0 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ jobs:
- run:
name: Build solution
command: |
cd ./src
dotnet build
- run:
name: Test
command: |
cd ./src
dotnet test
6 changes: 2 additions & 4 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ jobs:
# and modify them (or add more) to build your code if your project
# uses a compiled language

- run: |
cd ./src
dotnet build -c Release
- run: dotnet build -c Release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v1
4 changes: 1 addition & 3 deletions .github/workflows/nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ jobs:
uses: actions/setup-dotnet@v1

- name: Generate NuGet packages
run: |
cd src
dotnet pack -c Release
run: dotnet pack -c Release

- name: Push packages to GitHub registry
run: dotnet nuget push ./packages/*.nupkg -k ${NUGET_API_KEY} -s https://api.nuget.org/v3/index.json --skip-duplicate
Expand Down
16 changes: 8 additions & 8 deletions src/OpenSleigh.sln → OpenSleigh.sln
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30804.86
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core", "OpenSleigh.Core\OpenSleigh.Core.csproj", "{82774D46-07E9-4A39-A987-D49CD1412DCE}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core", "src\OpenSleigh.Core\OpenSleigh.Core.csproj", "{82774D46-07E9-4A39-A987-D49CD1412DCE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core.Tests", "OpenSleigh.Core.Tests\OpenSleigh.Core.Tests.csproj", "{4AC0DD4D-CB5E-4BF2-87B5-442A06CF38B3}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Core.Tests", "tests\OpenSleigh.Core.Tests\OpenSleigh.Core.Tests.csproj", "{4AC0DD4D-CB5E-4BF2-87B5-442A06CF38B3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory", "OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj", "{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory", "src\OpenSleigh.Persistence.InMemory\OpenSleigh.Persistence.InMemory.csproj", "{1E8C0BF5-04DA-4228-9F78-C69E7EAE08F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory.Tests", "OpenSleigh.Persistence.InMemory.Tests\OpenSleigh.Persistence.InMemory.Tests.csproj", "{E7C7C0F4-CD99-4F97-A905-13EAABF10498}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.InMemory.Tests", "tests\OpenSleigh.Persistence.InMemory.Tests\OpenSleigh.Persistence.InMemory.Tests.csproj", "{E7C7C0F4-CD99-4F97-A905-13EAABF10498}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence", "{5594CC89-F905-46B2-B938-27B8050D9CA3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Mongo", "OpenSleigh.Persistence.Mongo\OpenSleigh.Persistence.Mongo.csproj", "{FD1CCF1D-68C7-49D5-B3A2-E3D34F269B14}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Mongo", "src\OpenSleigh.Persistence.Mongo\OpenSleigh.Persistence.Mongo.csproj", "{FD1CCF1D-68C7-49D5-B3A2-E3D34F269B14}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Mongo.Tests", "OpenSleigh.Persistence.Mongo.Tests\OpenSleigh.Persistence.Mongo.Tests.csproj", "{48BF1200-BAA2-48A7-B7DA-1FD0A070FF0E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Mongo.Tests", "tests\OpenSleigh.Persistence.Mongo.Tests\OpenSleigh.Persistence.Mongo.Tests.csproj", "{48BF1200-BAA2-48A7-B7DA-1FD0A070FF0E}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Transport", "Transport", "{86CDC8FD-5E6F-4F45-A073-9F2749192582}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Transport.RabbitMQ", "OpenSleigh.Transport.RabbitMQ\OpenSleigh.Transport.RabbitMQ.csproj", "{01C17CD2-1927-4562-9ACF-9566D168096F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Transport.RabbitMQ", "src\OpenSleigh.Transport.RabbitMQ\OpenSleigh.Transport.RabbitMQ.csproj", "{01C17CD2-1927-4562-9ACF-9566D168096F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.RabbitMQ.Tests", "OpenSleigh.Transport.RabbitMQ.Tests\OpenSleigh.Transport.RabbitMQ.Tests.csproj", "{2E5F3E44-A3E7-429A-81BB-FC907F0D5377}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.RabbitMQ.Tests", "tests\OpenSleigh.Transport.RabbitMQ.Tests\OpenSleigh.Transport.RabbitMQ.Tests.csproj", "{2E5F3E44-A3E7-429A-81BB-FC907F0D5377}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
12 changes: 7 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class MyAwesomeSaga :
}
```

Dependency injection can be used to reference services from Sagas.

At this point all you have to do is register and configure the Saga:
```
services.AddOpenSleigh(cfg =>{
Expand Down Expand Up @@ -105,18 +107,17 @@ public class MyAwesomeSaga :
```

#### Publishing messages
A message can be published directly from a Saga using the `Publish()` method on the base `Saga` class:
A message can be published by calling the `PublishAsync()` method of `IMessageBus`. Sagas classes get an instance injected as Property:

```
public class MyAwesomeSaga :
Saga<MyAwesomeSagaState>,
IStartedBy<StartMyAwesomeSaga>
{
public async Task HandleAsync(IMessageContext<StartMyAwesomeSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting saga '{context.Message.CorrelationId}'...");
var message = new MyAwesomeSagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
this.Publish(message);
this.Bus.PublishAsync(message);
}
}
```
Expand All @@ -126,6 +127,7 @@ OpenSleigh uses the [Outbox pattern](https://www.davideguida.com/improving-micro
A .NET Console application is available in the `/samples/` folder. Before running it, make sure to spin-up the required infrastructure using the provided docker-compose configuration using `docker-compose up`.

## Roadmap
- add more tests
- add more logging
- add Azure ServiceBus message transport
- add CosmosDB saga state persistence
- add CosmosDB saga state persistence
3 changes: 2 additions & 1 deletion samples/OpenSleigh.Samples.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ static async Task Main(string[] args)
var hostBuilder = CreateHostBuilder(args);
var host = hostBuilder.Build();

var bus = host.Services.GetRequiredService<IMessageBus>();
using var scope = host.Services.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var message = new StartParentSaga(Guid.NewGuid(), Guid.NewGuid());

await Task.WhenAll(new[]
Expand Down
6 changes: 2 additions & 4 deletions samples/OpenSleigh.Samples.Console/Sagas/ChildSaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ public class ChildSaga :
IStartedBy<StartChildSaga>,
IHandleMessage<ProcessChildSaga>
{
private readonly IMessageBus _bus;
private readonly ILogger<ChildSaga> _logger;

private readonly Random _random = new Random();

public ChildSaga(ILogger<ChildSaga> logger, IMessageBus bus)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
}

public async Task HandleAsync(IMessageContext<StartChildSaga> context, CancellationToken cancellationToken = default)
Expand All @@ -39,7 +37,7 @@ public async Task HandleAsync(IMessageContext<StartChildSaga> context, Cancellat
await Task.Delay(TimeSpan.FromSeconds(_random.Next(1, 5)), cancellationToken);

var message = new ProcessChildSaga(Guid.NewGuid(), context.Message.CorrelationId);
await _bus.PublishAsync(message, cancellationToken);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ProcessChildSaga> context, CancellationToken cancellationToken = default)
Expand All @@ -51,7 +49,7 @@ public async Task HandleAsync(IMessageContext<ProcessChildSaga> context, Cancell
_logger.LogInformation($"child saga '{context.Message.CorrelationId}' completed!");

var completedEvent = new ChildSagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
await _bus.PublishAsync(completedEvent, cancellationToken);
await this.Bus.PublishAsync(completedEvent, cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions samples/OpenSleigh.Samples.Console/Sagas/ParentSaga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,29 @@ public class ParentSaga :
IHandleMessage<ChildSagaCompleted>,
IHandleMessage<ParentSagaCompleted>
{
private readonly IMessageBus _bus;
private readonly ILogger<ParentSaga> _logger;

private readonly Random _random = new Random();

public ParentSaga(ILogger<ParentSaga> logger, IMessageBus bus)
public ParentSaga(ILogger<ParentSaga> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_bus = bus ?? throw new ArgumentNullException(nameof(bus));
}

public async Task HandleAsync(IMessageContext<StartParentSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting parent saga '{context.Message.CorrelationId}'...");

var message = new ProcessParentSaga(Guid.NewGuid(), context.Message.CorrelationId);
await _bus.PublishAsync(message);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ProcessParentSaga> context, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"starting child saga from parent saga '{context.Message.CorrelationId}'...");

var message = new StartChildSaga(Guid.NewGuid(), context.Message.CorrelationId);
await _bus.PublishAsync(message);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ChildSagaCompleted> context, CancellationToken cancellationToken = default)
Expand All @@ -57,7 +55,7 @@ public async Task HandleAsync(IMessageContext<ChildSagaCompleted> context, Cance
await Task.Delay(TimeSpan.FromSeconds(_random.Next(1, 5)), cancellationToken);

var message = new ParentSagaCompleted(Guid.NewGuid(), context.Message.CorrelationId);
await _bus.PublishAsync(message);
await this.Bus.PublishAsync(message, cancellationToken);
}

public async Task HandleAsync(IMessageContext<ParentSagaCompleted> context, CancellationToken cancellationToken = default)
Expand Down
6 changes: 6 additions & 0 deletions samples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ services:
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: password
MONGO_REPLICA_SET_NAME: opensleigh
command: ["--replSet", "opensleigh", "--bind_ip_all"]
healthcheck:
test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u root -p password --quiet) -eq 1
interval: 10s
start_period: 30s
ports:
- 27017:27017

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

namespace OpenSleigh.Core.BackgroundServices
{
//TODO: add another background service to delete processed messages on regular basis

public class PublisherBackgroundService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
Expand Down
1 change: 1 addition & 0 deletions src/OpenSleigh.Core/DefaultSagaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public TS Create(TD state)
using var scope = _serviceProvider.CreateScope();
var saga = scope.ServiceProvider.GetRequiredService<TS>();
saga.State = state;
saga.Bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
return saga;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static IServiceCollection AddOpenSleigh(this IServiceCollection services,
return resolver;
})
.AddSingleton<IMessageContextFactory, DefaultMessageContextFactory>()
.AddSingleton<IMessageBus, DefaultMessageBus>()
.AddScoped<IMessageBus, DefaultMessageBus>()
.AddSingleton<IMessageProcessor, MessageProcessor>();

var builder = new BusConfigurator(services, stateTypeResolver);
Expand Down
14 changes: 7 additions & 7 deletions src/OpenSleigh.Core/IMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core.Persistence;
Expand All @@ -10,25 +8,27 @@ namespace OpenSleigh.Core
public interface IMessageBus
{
Task PublishAsync<TM>(TM message, CancellationToken cancellationToken = default) where TM : IMessage;
void SetTransaction(ITransaction transaction);
}

internal class DefaultMessageBus : IMessageBus
{
private readonly IOutboxRepository _outboxRepository;
private readonly ILogger<DefaultMessageBus> _logger;
private ITransaction _transaction;

public DefaultMessageBus(IOutboxRepository outboxRepository, ILogger<DefaultMessageBus> logger)
public DefaultMessageBus(IOutboxRepository outboxRepository)
{
_outboxRepository = outboxRepository ?? throw new ArgumentNullException(nameof(outboxRepository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task PublishAsync<TM>(TM message, CancellationToken cancellationToken = default) where TM : IMessage
{
if (message == null)
throw new ArgumentNullException(nameof(message));

await _outboxRepository.AppendAsync(message, cancellationToken);
await _outboxRepository.AppendAsync(message, _transaction, cancellationToken);
}

public void SetTransaction(ITransaction transaction) => _transaction = transaction;
}
}
3 changes: 2 additions & 1 deletion src/OpenSleigh.Core/ISagaStateService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core.Persistence;

namespace OpenSleigh.Core
{
Expand All @@ -11,6 +12,6 @@ public interface ISagaStateService<TS, TD>
Task<(TD state, Guid lockId)> GetAsync<TM>(IMessageContext<TM> messageContext,
CancellationToken cancellationToken = default) where TM : IMessage;

Task SaveAsync(TD state, Guid lockId, CancellationToken cancellationToken = default);
Task SaveAsync(TD state, Guid lockId, ITransaction transaction = null, CancellationToken cancellationToken = default);
}
}
2 changes: 1 addition & 1 deletion src/OpenSleigh.Core/OpenSleigh.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>0.1.0</PackageVersion>
<PackageVersion>0.2.0</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<PackageDescription>OpenSleigh is a distributed saga management library for .NET Core.</PackageDescription>
Expand Down
2 changes: 1 addition & 1 deletion src/OpenSleigh.Core/Persistence/IOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public interface IOutboxRepository
{
Task<IEnumerable<IMessage>> ReadMessagesToProcess(CancellationToken cancellationToken = default);
Task MarkAsSentAsync(IMessage message, CancellationToken cancellationToken = default);
Task AppendAsync(IMessage message, CancellationToken cancellationToken);
Task AppendAsync(IMessage message, ITransaction transaction = null, CancellationToken cancellationToken = default);
}
}
2 changes: 1 addition & 1 deletion src/OpenSleigh.Core/Persistence/ISagaStateRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace OpenSleigh.Core.Persistence
public interface ISagaStateRepository
{
Task<(TD state, Guid lockId)> LockAsync<TD>(Guid correlationId, TD newState = null, CancellationToken cancellationToken = default) where TD : SagaState;
Task UpdateAsync<TD>(TD state, Guid lockId, bool releaseLock = false, CancellationToken cancellationToken = default) where TD : SagaState;
Task ReleaseLockAsync<TD>(TD state, Guid lockId, ITransaction transaction = null, CancellationToken cancellationToken = default) where TD : SagaState;
}
}
12 changes: 12 additions & 0 deletions src/OpenSleigh.Core/Persistence/NullTransaction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading;
using System.Threading.Tasks;

namespace OpenSleigh.Core.Persistence
{
public class NullTransaction : ITransaction
{
public Task CommitAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task RollbackAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}
}
6 changes: 5 additions & 1 deletion src/OpenSleigh.Core/Saga.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
namespace OpenSleigh.Core
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("OpenSleigh.Core.Tests")]
namespace OpenSleigh.Core
{
public abstract class Saga<TD>
where TD : SagaState
{
public TD State { get; internal set; }
public IMessageBus Bus { get; internal set; }
}
}
Loading

0 comments on commit 2e44cc8

Please sign in to comment.