Skip to content

Commit

Permalink
Healthchecks and WaitFor implemented by @davidfowl
Browse files Browse the repository at this point in the history
- Enable waiting for dependencies before starting resources
Original code:
https://github.com/davidfowl/WaitForDependenciesAspire
  • Loading branch information
fredimachado committed May 18, 2024
1 parent b817bf8 commit cb593f2
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 8 deletions.
11 changes: 7 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
<PackageVersion Include="Aspire.Hosting" Version="8.0.0-preview.7.24251.11" />
<PackageVersion Include="Aspire.Hosting.AppHost" Version="8.0.0-preview.7.24251.11" />
<PackageVersion Include="Aspire.Hosting.RabbitMQ" Version="8.0.0-preview.7.24251.11" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="8.3.0" />
<PackageVersion Include="Aspire.RabbitMQ.Client" Version="8.0.0-preview.7.24251.11" />
<PackageVersion Include="AspNetCore.HealthChecks.EventStore.gRPC" Version="6.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Rabbitmq" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="8.5.0" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery" Version="8.0.0-preview.7.24251.11" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
Expand All @@ -23,9 +26,9 @@
<PackageVersion Include="FakeItEasy" Version="8.2.0" />
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="FluentAssertions.Analyzers" Version="0.31.0" />
<PackageVersion Include="Microsoft.AspNetCore.Components.WebAssembly" Version="8.0.4" />
<PackageVersion Include="Microsoft.AspNetCore.Components.WebAssembly.DevServer" Version="8.0.4" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.4" />
<PackageVersion Include="Microsoft.AspNetCore.Components.WebAssembly" Version="8.0.5" />
<PackageVersion Include="Microsoft.AspNetCore.Components.WebAssembly.DevServer" Version="8.0.5" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.5" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageVersion Include="MinVer" Version="4.3.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using HealthChecks.EventStore.gRPC;

namespace Aspire.Hosting;

public static class EventStoreHealthCheckExtensions
{
/// <summary>
/// Adds a health check to the EventStore resource.
/// </summary>
public static IResourceBuilder<EventStoreResource> WithHealthCheck(this IResourceBuilder<EventStoreResource> builder)
{
return builder.WithAnnotation(
HealthCheckAnnotation.Create(connectionString => new EventStoreHealthCheck(connectionString)));
}
}
30 changes: 30 additions & 0 deletions src/Aspire/NCafe.AppHost/Hosting/HealthCheckAnnotation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Aspire.Hosting;

/// <summary>
/// An annotation that associates a health check factory with a resource
/// </summary>
/// <param name="healthCheckFactory">A function that creates the health check</param>
public class HealthCheckAnnotation(Func<IResource, CancellationToken, Task<IHealthCheck?>> healthCheckFactory) : IResourceAnnotation
{
public Func<IResource, CancellationToken, Task<IHealthCheck?>> HealthCheckFactory { get; } = healthCheckFactory;

public static HealthCheckAnnotation Create(Func<string, IHealthCheck> connectionStringFactory)
{
return new(async (resource, token) =>
{
if (resource is not IResourceWithConnectionString c)
{
return null;
}

if (await c.GetConnectionStringAsync(token) is not string cs)
{
return null;
}

return connectionStringFactory(cs);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using HealthChecks.RabbitMQ;

namespace Aspire.Hosting;

public static class RabbitMQResourceHealthCheckExtensions
{
/// <summary>
/// Adds a health check to the RabbitMQ server resource.
/// </summary>
public static IResourceBuilder<RabbitMQServerResource> WithHealthCheck(this IResourceBuilder<RabbitMQServerResource> builder)
{
return builder.WithAnnotation(
HealthCheckAnnotation.Create(cs => new RabbitMQHealthCheck(new RabbitMQHealthCheckOptions { ConnectionUri = new(cs) })));
}
}
300 changes: 300 additions & 0 deletions src/Aspire/NCafe.AppHost/Hosting/WaitForDependenciesExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
using System.Collections.Concurrent;
using System.Runtime.ExceptionServices;
using Aspire.Hosting.Lifecycle;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;

namespace Aspire.Hosting;

public static class WaitForDependenciesExtensions
{
/// <summary>
/// Wait for a resource to be running before starting another resource.
/// </summary>
/// <typeparam name="T">The resource type.</typeparam>
/// <param name="builder">The resource builder.</param>
/// <param name="other">The resource to wait for.</param>
public static IResourceBuilder<T> WaitFor<T>(this IResourceBuilder<T> builder, IResourceBuilder<IResource> other)
where T : IResource
{
builder.ApplicationBuilder.AddWaitForDependencies();
return builder.WithAnnotation(new WaitOnAnnotation(other.Resource));
}

/// <summary>
/// Wait for a resource to run to completion before starting another resource.
/// </summary>
/// <typeparam name="T">The resource type.</typeparam>
/// <param name="builder">The resource builder.</param>
/// <param name="other">The resource to wait for.</param>
public static IResourceBuilder<T> WaitForCompletion<T>(this IResourceBuilder<T> builder, IResourceBuilder<IResource> other)
where T : IResource
{
builder.ApplicationBuilder.AddWaitForDependencies();
return builder.WithAnnotation(new WaitOnAnnotation(other.Resource) { WaitUntilCompleted = true });
}

/// <summary>
/// Adds a lifecycle hook that waits for all dependencies to be "running" before starting resources. If that resource
/// has a health check, it will be executed before the resource is considered "running".
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
private static IDistributedApplicationBuilder AddWaitForDependencies(this IDistributedApplicationBuilder builder)
{
builder.Services.TryAddLifecycleHook<WaitForDependenciesRunningHook>();
return builder;
}

private class WaitOnAnnotation(IResource resource) : IResourceAnnotation
{
public IResource Resource { get; } = resource;

public string[]? States { get; set; }

public bool WaitUntilCompleted { get; set; }
}

private class WaitForDependenciesRunningHook(DistributedApplicationExecutionContext executionContext,
ResourceNotificationService resourceNotificationService) :
IDistributedApplicationLifecycleHook,
IAsyncDisposable
{
private readonly CancellationTokenSource _cts = new();

public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationToken cancellationToken = default)
{
// We don't need to execute any of this logic in publish mode
if (executionContext.IsPublishMode)
{
return Task.CompletedTask;
}

// The global list of resources being waited on
var waitingResources = new ConcurrentDictionary<IResource, ConcurrentDictionary<WaitOnAnnotation, TaskCompletionSource>>();

// For each resource, add an environment callback that waits for dependencies to be running
foreach (var r in appModel.Resources)
{
var resourcesToWaitOn = r.Annotations.OfType<WaitOnAnnotation>().ToLookup(a => a.Resource);

if (resourcesToWaitOn.Count == 0)
{
continue;
}

// Abuse the environment callback to wait for dependencies to be running

r.Annotations.Add(new EnvironmentCallbackAnnotation(async context =>
{
var dependencies = new List<Task>();

// Find connection strings and endpoint references and get the resource they point to
foreach (var group in resourcesToWaitOn)
{
var resource = group.Key;

// REVIEW: This logic does not handle cycles in the dependency graph (that would result in a deadlock)

// Don't wait for yourself
if (resource != r && resource is not null)
{
var pendingAnnotations = waitingResources.GetOrAdd(resource, _ => new());

foreach (var waitOn in group)
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

async Task Wait()
{
context.Logger?.LogInformation("Waiting for {Resource}.", waitOn.Resource.Name);

await tcs.Task;

context.Logger?.LogInformation("Waiting for {Resource} completed.", waitOn.Resource.Name);
}

pendingAnnotations[waitOn] = tcs;

dependencies.Add(Wait());
}
}
}

await resourceNotificationService.PublishUpdateAsync(r, s => s with
{
State = new("Waiting", KnownResourceStateStyles.Info)
});

await Task.WhenAll(dependencies).WaitAsync(context.CancellationToken);
}));
}

_ = Task.Run(async () =>
{
var stoppingToken = _cts.Token;

// These states are terminal but we need a better way to detect that
static bool IsKnownTerminalState(CustomResourceSnapshot snapshot) =>
snapshot.State == "FailedToStart" ||
snapshot.State == "Exited" ||
snapshot.ExitCode is not null;

// Watch for global resource state changes
await foreach (var resourceEvent in resourceNotificationService.WatchAsync(stoppingToken))
{
if (waitingResources.TryGetValue(resourceEvent.Resource, out var pendingAnnotations))
{
foreach (var (waitOn, tcs) in pendingAnnotations)
{
if (waitOn.States is string[] states && states.Contains(resourceEvent.Snapshot.State?.Text, StringComparer.Ordinal))
{
pendingAnnotations.TryRemove(waitOn, out _);

_ = DoTheHealthCheck(resourceEvent, tcs);
}
else if (waitOn.WaitUntilCompleted)
{
if (IsKnownTerminalState(resourceEvent.Snapshot))
{
pendingAnnotations.TryRemove(waitOn, out _);

_ = DoTheHealthCheck(resourceEvent, tcs);
}
}
else if (waitOn.States is null)
{
if (resourceEvent.Snapshot.State == "Running")
{
pendingAnnotations.TryRemove(waitOn, out _);

_ = DoTheHealthCheck(resourceEvent, tcs);
}
else if (IsKnownTerminalState(resourceEvent.Snapshot))
{
pendingAnnotations.TryRemove(waitOn, out _);

tcs.TrySetException(new Exception($"Dependency {waitOn.Resource.Name} failed to start"));
}
}
}
}
}
},
cancellationToken);

return Task.CompletedTask;
}

private async Task DoTheHealthCheck(ResourceEvent resourceEvent, TaskCompletionSource tcs)
{
var resource = resourceEvent.Resource;

// REVIEW: Right now, every resource does an independent health check, we could instead cache
// the health check result and reuse it for all resources that depend on the same resource


HealthCheckAnnotation? healthCheckAnnotation = null;

// Find the relevant health check annotation. If the resource has a parent, walk up the tree
// until we find the health check annotation.
while (true)
{
// If we find a health check annotation, break out of the loop
if (resource.TryGetLastAnnotation(out healthCheckAnnotation))
{
break;
}

// If the resource has a parent, walk up the tree
if (resource is IResourceWithParent parent)
{
resource = parent.Parent;
}
else
{
break;
}
}

Func<CancellationToken, ValueTask>? operation = null;

if (healthCheckAnnotation?.HealthCheckFactory is { } factory)
{
IHealthCheck? check;

try
{
// TODO: Do need to pass a cancellation token here?
check = await factory(resource, default);

if (check is not null)
{
var context = new HealthCheckContext()
{
Registration = new HealthCheckRegistration("", check, HealthStatus.Unhealthy, [])
};

operation = async (cancellationToken) =>
{
var result = await check.CheckHealthAsync(context, cancellationToken);

if (result.Exception is not null)
{
ExceptionDispatchInfo.Throw(result.Exception);
}

if (result.Status != HealthStatus.Healthy)
{
throw new Exception("Health check failed");
}
};
}
}
catch (Exception ex)
{
tcs.TrySetException(ex);

return;
}
}

try
{
if (operation is not null)
{
var pipeline = CreateResiliencyPipeline();

await pipeline.ExecuteAsync(operation);
}

tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}

private static ResiliencePipeline CreateResiliencyPipeline()
{
var retryUntilCancelled = new RetryStrategyOptions()
{
ShouldHandle = new PredicateBuilder().Handle<Exception>(),
BackoffType = DelayBackoffType.Exponential,
MaxRetryAttempts = 5,
UseJitter = true,
MaxDelay = TimeSpan.FromSeconds(30)
};

return new ResiliencePipelineBuilder().AddRetry(retryUntilCancelled).Build();
}

public ValueTask DisposeAsync()
{
_cts.Cancel();
return default;
}
}
}
2 changes: 2 additions & 0 deletions src/Aspire/NCafe.AppHost/NCafe.AppHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
<ItemGroup>
<PackageReference Include="Aspire.Hosting.AppHost" />
<PackageReference Include="Aspire.Hosting.RabbitMQ" />
<PackageReference Include="AspNetCore.HealthChecks.EventStore.gRPC" />
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit cb593f2

Please sign in to comment.