Skip to content

Commit

Permalink
Refactor PulseFlow library and adapt tests
Browse files Browse the repository at this point in the history
This commit involves a significant refactor of the PulseFlow library entailing changes to its architecture. The previous flow using separate channel classes (Channel.cs and IChannel.cs) has been eliminated in favor of direct usage of System.Threading.Channels in the Conduit and PulseNexus classes. Additional improvements include the introduction of the IPulseHandler interface to extend the handling capabilities, updates to the service collection methods, and the creation of GlobalUsings.cs for streamlined namespace imports. Corresponding tests have also been recalibrated in PulseFlowTests.cs to align with the changes in the library's API and structure.
  • Loading branch information
frankhaugen committed Feb 3, 2024
1 parent e50b8ba commit 9b4cb95
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 191 deletions.
2 changes: 1 addition & 1 deletion Frank.PulseFlow.Logging/PulseFlowLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public PulseFlowLogger(string categoryName, IConduit conduit, IOptionsMonitor<Lo
/// <param name="exception">The exception associated with the log entry.</param>
/// <param name="formatter">A function that formats the log message.</param>
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
=> _conduit.SendAsync(new LogPulse(logLevel, eventId, exception, _categoryName, formatter.Invoke(state, exception), state as IReadOnlyList<KeyValuePair<string, object?>>)).GetAwaiter().GetResult();
=> _conduit.SendAsync(new LogPulse(logLevel, eventId, exception, _categoryName, formatter.Invoke(state, exception), (IReadOnlyList<KeyValuePair<string, object?>>)state!), CancellationToken.None).GetAwaiter().GetResult();

/// <summary>
/// Checks if logging is enabled for the specified log level.
Expand Down
64 changes: 64 additions & 0 deletions Frank.PulseFlow.Logging/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Frank.PulseFlow.Logging

This library provides a simple logger for use in .NET applications. It uses the `Microsoft.Extensions.Logging` library
backed by `Frank.PulseFlow` for logging. It will log to the console by default, but can add one or more `IFlow`'s to do
whatever you want with the log messages. A common use case is to log to a file or a database, and because `Frank.PulseFlow`
is thread-safe, you can do so without worrying about concurrency issues like file locks, or the overhead of waiting for a lock.

## Usage

```csharp
using Frank.PulseFlow.Logging;

public class Program
{
public static async Task Main(string[] args)
{
var builder = new HostBuilder()
.ConfigureLogging((hostContext, logging) =>
{
logging.AddPulseFlow();
})
.ConfigureServices((hostContext, services) =>
{
services.AddPulseFlow<FileLoggerFlow>();
});
.Build();

await builder.RunAsync();
}
}

public class FileLoggerFlow(IOptions<FileLoggerSettings> options) : IFlow
{
private readonly FileLoggerSettings _settings = options.Value;

public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken)
{
var thing = pulse as LogPulse;
await File.AppendAllTextAsync(_settings.LogPath, thing! + Environment.NewLine, cancellationToken);
}

public bool CanHandle(Type pulseType) => pulseType == typeof(LogPulse);
}

public class FileLoggerSettings
{
public string LogPath { get; set; } = "../../../../logs.log";
}
```

## Configuration

The `AddPulseFlow` method has a few overloads that allow you to configure the logger. The default configuration is to log
to the console, but you can add one or more `IFlow`'s to the logger to do whatever you want with the log messages. A common
use case is to log to a file or a database, and because `Frank.PulseFlow` is thread-safe, you can do so without worrying
about concurrency issues like file locks, or the overhead of waiting for a lock.

## Contributing

Contributions are welcome! Please see create an issue before submitting a pull request to discuss the changes you would like to make.

## License

This library is licensed under the MIT license. See the [LICENSE](../LICENSE) file for more information.
4 changes: 3 additions & 1 deletion Frank.PulseFlow.Tests/Frank.PulseFlow.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Frank.Reflection" Version="1.1.0" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="Frank.Reflection" Version="1.3.0" />
<PackageReference Include="Frank.Testing.TestBases" Version="1.6.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="xunit" Version="2.6.6" />
Expand Down
208 changes: 156 additions & 52 deletions Frank.PulseFlow.Tests/PulseFlowTests.cs
Original file line number Diff line number Diff line change
@@ -1,93 +1,197 @@
using System.Diagnostics;

using FluentAssertions;

using Frank.PulseFlow.Logging;
using Frank.Reflection;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

using Xunit.Abstractions;
using Frank.Reflection;
using Frank.Testing.TestBases;

using Microsoft.Extensions.Options;

namespace Frank.PulseFlow.Tests;

public class PulseFlowTests
public class PulseFlowTests(ITestOutputHelper outputHelper) : HostApplicationTestBase(outputHelper)
{
private readonly ITestOutputHelper _outputHelper;
private readonly ITestOutputHelper _outputHelper = outputHelper;
private readonly TestPulseContainer _container = new();

public PulseFlowTests(ITestOutputHelper outputHelper)
/// <inheritdoc />
protected override Task SetupAsync(HostApplicationBuilder builder)
{
_outputHelper = outputHelper;
builder.Logging.AddPulseFlow();

builder.Services.AddPulseFlow<FileLoggerFlow>();
builder.Services.AddPulseFlow(x => x.AddFlow<BlueOutputFlow>().AddFlow<TestOutputHelperFlow>());
builder.Services.AddPulseFlow<RedOutputFlow>();
builder.Services.AddPulseFlow<TimerPulse, TimerHandler>();
builder.Services.AddPulseFlow<TimerPulse, TimerHandler2>();
builder.Services.AddHostedService<MyService>();
builder.Services.AddSingleton(_container);

builder.Services.Configure<FileLoggerSettings>(x => x.LogPath = "logs.log");
_outputHelper.WriteTable(builder.Services.Select(x => new { Service = x.ServiceType.GetFriendlyName(), Implementation = x.ImplementationType?.GetFriendlyName(), x.Lifetime }).OrderBy(x => x.Service));
return Task.CompletedTask;
}

[Fact]
public void Test1()
public async Task Test1()
{
var host = CreateHostBuilder().Build();
await Task.Delay(500);
var overview = new []
{
new
{
Name = "Blue", Count = _container.BlueMessages.Count,
},
new
{
Name = "Red", Count = _container.RedMessages.Count,
},
new
{
Name = "Log", Count = _container.LogMessages.Count,
},
new
{
Name = "Timer", Count = _container.TimerPulses.Count,
},
new
{
Name = "Timer2", Count = _container.TimerPulses2.Count,
},
};

_outputHelper.WriteTable(overview);

host.Start();
await Task.Delay(500);

_container.BlueMessages.Should().NotBeEmpty();
_container.RedMessages.Should().NotBeEmpty();
_container.LogMessages.Should().NotBeEmpty();
_container.TimerPulses.Should().NotBeEmpty();
_container.TimerPulses2.Should().NotBeEmpty();
}

private class MyService : BackgroundService
private class TestOutputHelperFlow(TestPulseContainer container) : IFlow
{
private readonly ILogger<MyService> _logger;
public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken)
{
var thing = pulse as LogPulse;
container.LogMessages.Add(thing!);
await Task.CompletedTask;
}

public MyService(ILogger<MyService> logger) => _logger = logger;
public bool CanHandle(Type pulseType)
{
return pulseType == typeof(LogPulse);
}
}

private class BlueOutputFlow(TestPulseContainer container) : IFlow
{
public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken)
{
var thing = pulse as MyMessage;
container.BlueMessages.Add(thing!);
await Task.CompletedTask;
}

public bool CanHandle(Type pulseType)
{
return pulseType == typeof(MyMessage);
}
}

private class RedOutputFlow(TestPulseContainer container) : IFlow
{
public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken)
{
var thing = pulse as MyMessage;
container.RedMessages.Add(thing!);
await Task.CompletedTask;
}

public bool CanHandle(Type pulseType)
{
return pulseType == typeof(MyMessage);
}
}

private class MyService(IConduit conduit) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Hello from {ServiceName}", nameof(MyService));
try
{
throw new Exception("This is an exception");
}
catch (Exception e)
var stopWatch = Stopwatch.StartNew();
while (!stoppingToken.IsCancellationRequested && stopWatch.Elapsed < TimeSpan.FromSeconds(1))
{
_logger.LogError(e, "This is an exception in {ServiceName}", nameof(MyService));
await conduit.SendAsync(new MyMessage("Hello, World! " + stopWatch.Elapsed.ToString("c")), stoppingToken);
await conduit.SendAsync(new TimerPulse() {Elapsed = stopWatch.Elapsed}, stoppingToken);
}
await Task.Delay(1000, stoppingToken);
}

}

private IHostBuilder CreateHostBuilder()
private class TimerHandler(TestPulseContainer container) : IPulseHandler<TimerPulse>
{
return Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddPulseFlow();
})
.ConfigureServices((context, services) =>
{
services.AddSingleton<ITestOutputHelper>(_outputHelper);
services.AddPulseFlow(builder =>
{
builder.AddFlow<TestOutputFlow>();
});

services.AddHostedService<MyService>();
});
public async Task HandleAsync(TimerPulse pulse, CancellationToken cancellationToken)
{
container.TimerPulses.Add(pulse);
await Task.CompletedTask;
}
}

private class TestOutputFlow : IFlow
private class TimerHandler2(TestPulseContainer container) : IPulseHandler<TimerPulse>
{
private readonly ITestOutputHelper _outputHelper;

public TestOutputFlow(ITestOutputHelper outputHelper)
public async Task HandleAsync(TimerPulse pulse, CancellationToken cancellationToken)
{
_outputHelper = outputHelper;
container.TimerPulses2.Add(pulse);
await Task.CompletedTask;
}

}

private class MyMessage(string message) : BasePulse
{
public string Message { get; set; } = message;

public override string ToString() => $"MyMessage: {Message}";
}

private class TimerPulse : BasePulse
{
public TimeSpan Elapsed { get; set; }
}

private class TestPulseContainer
{
public List<MyMessage> BlueMessages { get; } = new();
public List<MyMessage> RedMessages { get; } = new();
public List<LogPulse> LogMessages { get; } = new();
public List<TimerPulse> TimerPulses { get; } = new();
public List<TimerPulse> TimerPulses2 { get; } = new();

}

public class FileLoggerFlow(IOptions<FileLoggerSettings> options) : IFlow
{
private readonly FileLoggerSettings _settings = options.Value;

public async Task HandleAsync(IPulse pulse, CancellationToken cancellationToken)
{
var thing = pulse as LogPulse;
var message = thing!.ToString();
_outputHelper.WriteLine(message);
await File.AppendAllTextAsync(_settings.LogPath!, thing! + Environment.NewLine, cancellationToken);
await Task.CompletedTask;
}

public bool CanHandle(Type pulseType)
{
_outputHelper.WriteLine($"CanHandle: {pulseType.GetFriendlyName()}");
return pulseType.BaseType == typeof(LogPulse);
}
public bool CanHandle(Type pulseType) => pulseType == typeof(LogPulse);
}

public class FileLoggerSettings
{
public string? LogPath { get; set; }
}
}
}
3 changes: 2 additions & 1 deletion Frank.PulseFlow/Frank.PulseFlow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<PackageReference Include="Frank.Channels.DependencyInjection" Version="1.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
</ItemGroup>
</Project>
9 changes: 9 additions & 0 deletions Frank.PulseFlow/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Global using directives

global using System.Threading.Channels;

global using Frank.Channels.DependencyInjection;
global using Frank.PulseFlow.Internal;

global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Hosting;
4 changes: 2 additions & 2 deletions Frank.PulseFlow/IConduit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public interface IConduit
/// Sends a pulse to the underlying infrastructure.
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task SendAsync(IPulse message);
/// <param name="cancellationToken"></param>
Task SendAsync(IPulse message, CancellationToken cancellationToken);
}
15 changes: 15 additions & 0 deletions Frank.PulseFlow/IPulseHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Frank.PulseFlow;

/// IPulseHandler Interface
/// Represents a handler for processing pulses.
/// /// <typeparam name="T">The type of pulse to handle.</typeparam>
public interface IPulseHandler<in T> where T : IPulse
{
/// <summary>
/// Handles the pulse asynchronously.
/// </summary>
/// <param name="pulse">The pulse to be handled.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task HandleAsync(T pulse, CancellationToken cancellationToken);
}
3 changes: 3 additions & 0 deletions Frank.PulseFlow/IncompatibleFlowException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Frank.PulseFlow;

public class IncompatibleFlowException(string s) : Exception(s);
Loading

0 comments on commit 9b4cb95

Please sign in to comment.