From cb593f2ffc17d1f9724d013ad0bd475db7d86cfe Mon Sep 17 00:00:00 2001 From: Fredi Machado Date: Sun, 19 May 2024 08:57:59 +1000 Subject: [PATCH] Healthchecks and WaitFor implemented by @davidfowl - Enable waiting for dependencies before starting resources Original code: https://github.com/davidfowl/WaitForDependenciesAspire --- Directory.Packages.props | 11 +- .../EventStoreHealthCheckExtensions.cs | 15 + .../Hosting/HealthCheckAnnotation.cs | 30 ++ .../RabbitMQResourceHealthCheckExtensions.cs | 15 + .../Hosting/WaitForDependenciesExtensions.cs | 300 ++++++++++++++++++ src/Aspire/NCafe.AppHost/NCafe.AppHost.csproj | 2 + src/Aspire/NCafe.AppHost/Program.cs | 17 +- src/Barista/NCafe.Barista.Api/Program.cs | 2 + src/Cashier/NCafe.Cashier.Api/Program.cs | 2 + .../NCafe.Infrastructure.csproj | 1 + 10 files changed, 387 insertions(+), 8 deletions(-) create mode 100644 src/Aspire/NCafe.AppHost/Hosting/EventStoreHealthCheckExtensions.cs create mode 100644 src/Aspire/NCafe.AppHost/Hosting/HealthCheckAnnotation.cs create mode 100644 src/Aspire/NCafe.AppHost/Hosting/RabbitMQResourceHealthCheckExtensions.cs create mode 100644 src/Aspire/NCafe.AppHost/Hosting/WaitForDependenciesExtensions.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 560facf..5054cdb 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -7,7 +7,10 @@ - + + + + @@ -23,9 +26,9 @@ - - - + + + diff --git a/src/Aspire/NCafe.AppHost/Hosting/EventStoreHealthCheckExtensions.cs b/src/Aspire/NCafe.AppHost/Hosting/EventStoreHealthCheckExtensions.cs new file mode 100644 index 0000000..6d37af1 --- /dev/null +++ b/src/Aspire/NCafe.AppHost/Hosting/EventStoreHealthCheckExtensions.cs @@ -0,0 +1,15 @@ +using HealthChecks.EventStore.gRPC; + +namespace Aspire.Hosting; + +public static class EventStoreHealthCheckExtensions +{ + /// + /// Adds a health check to the EventStore resource. + /// + public static IResourceBuilder WithHealthCheck(this IResourceBuilder builder) + { + return builder.WithAnnotation( + HealthCheckAnnotation.Create(connectionString => new EventStoreHealthCheck(connectionString))); + } +} diff --git a/src/Aspire/NCafe.AppHost/Hosting/HealthCheckAnnotation.cs b/src/Aspire/NCafe.AppHost/Hosting/HealthCheckAnnotation.cs new file mode 100644 index 0000000..cdea1d1 --- /dev/null +++ b/src/Aspire/NCafe.AppHost/Hosting/HealthCheckAnnotation.cs @@ -0,0 +1,30 @@ +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace Aspire.Hosting; + +/// +/// An annotation that associates a health check factory with a resource +/// +/// A function that creates the health check +public class HealthCheckAnnotation(Func> healthCheckFactory) : IResourceAnnotation +{ + public Func> HealthCheckFactory { get; } = healthCheckFactory; + + public static HealthCheckAnnotation Create(Func connectionStringFactory) + { + return new(async (resource, token) => + { + if (resource is not IResourceWithConnectionString c) + { + return null; + } + + if (await c.GetConnectionStringAsync(token) is not string cs) + { + return null; + } + + return connectionStringFactory(cs); + }); + } +} diff --git a/src/Aspire/NCafe.AppHost/Hosting/RabbitMQResourceHealthCheckExtensions.cs b/src/Aspire/NCafe.AppHost/Hosting/RabbitMQResourceHealthCheckExtensions.cs new file mode 100644 index 0000000..c71db05 --- /dev/null +++ b/src/Aspire/NCafe.AppHost/Hosting/RabbitMQResourceHealthCheckExtensions.cs @@ -0,0 +1,15 @@ +using HealthChecks.RabbitMQ; + +namespace Aspire.Hosting; + +public static class RabbitMQResourceHealthCheckExtensions +{ + /// + /// Adds a health check to the RabbitMQ server resource. + /// + public static IResourceBuilder WithHealthCheck(this IResourceBuilder builder) + { + return builder.WithAnnotation( + HealthCheckAnnotation.Create(cs => new RabbitMQHealthCheck(new RabbitMQHealthCheckOptions { ConnectionUri = new(cs) }))); + } +} diff --git a/src/Aspire/NCafe.AppHost/Hosting/WaitForDependenciesExtensions.cs b/src/Aspire/NCafe.AppHost/Hosting/WaitForDependenciesExtensions.cs new file mode 100644 index 0000000..533fa95 --- /dev/null +++ b/src/Aspire/NCafe.AppHost/Hosting/WaitForDependenciesExtensions.cs @@ -0,0 +1,300 @@ +using System.Collections.Concurrent; +using System.Runtime.ExceptionServices; +using Aspire.Hosting.Lifecycle; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using Polly; +using Polly.Retry; + +namespace Aspire.Hosting; + +public static class WaitForDependenciesExtensions +{ + /// + /// Wait for a resource to be running before starting another resource. + /// + /// The resource type. + /// The resource builder. + /// The resource to wait for. + public static IResourceBuilder WaitFor(this IResourceBuilder builder, IResourceBuilder other) + where T : IResource + { + builder.ApplicationBuilder.AddWaitForDependencies(); + return builder.WithAnnotation(new WaitOnAnnotation(other.Resource)); + } + + /// + /// Wait for a resource to run to completion before starting another resource. + /// + /// The resource type. + /// The resource builder. + /// The resource to wait for. + public static IResourceBuilder WaitForCompletion(this IResourceBuilder builder, IResourceBuilder other) + where T : IResource + { + builder.ApplicationBuilder.AddWaitForDependencies(); + return builder.WithAnnotation(new WaitOnAnnotation(other.Resource) { WaitUntilCompleted = true }); + } + + /// + /// Adds a lifecycle hook that waits for all dependencies to be "running" before starting resources. If that resource + /// has a health check, it will be executed before the resource is considered "running". + /// + /// The . + private static IDistributedApplicationBuilder AddWaitForDependencies(this IDistributedApplicationBuilder builder) + { + builder.Services.TryAddLifecycleHook(); + return builder; + } + + private class WaitOnAnnotation(IResource resource) : IResourceAnnotation + { + public IResource Resource { get; } = resource; + + public string[]? States { get; set; } + + public bool WaitUntilCompleted { get; set; } + } + + private class WaitForDependenciesRunningHook(DistributedApplicationExecutionContext executionContext, + ResourceNotificationService resourceNotificationService) : + IDistributedApplicationLifecycleHook, + IAsyncDisposable + { + private readonly CancellationTokenSource _cts = new(); + + public Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationToken cancellationToken = default) + { + // We don't need to execute any of this logic in publish mode + if (executionContext.IsPublishMode) + { + return Task.CompletedTask; + } + + // The global list of resources being waited on + var waitingResources = new ConcurrentDictionary>(); + + // For each resource, add an environment callback that waits for dependencies to be running + foreach (var r in appModel.Resources) + { + var resourcesToWaitOn = r.Annotations.OfType().ToLookup(a => a.Resource); + + if (resourcesToWaitOn.Count == 0) + { + continue; + } + + // Abuse the environment callback to wait for dependencies to be running + + r.Annotations.Add(new EnvironmentCallbackAnnotation(async context => + { + var dependencies = new List(); + + // Find connection strings and endpoint references and get the resource they point to + foreach (var group in resourcesToWaitOn) + { + var resource = group.Key; + + // REVIEW: This logic does not handle cycles in the dependency graph (that would result in a deadlock) + + // Don't wait for yourself + if (resource != r && resource is not null) + { + var pendingAnnotations = waitingResources.GetOrAdd(resource, _ => new()); + + foreach (var waitOn in group) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + async Task Wait() + { + context.Logger?.LogInformation("Waiting for {Resource}.", waitOn.Resource.Name); + + await tcs.Task; + + context.Logger?.LogInformation("Waiting for {Resource} completed.", waitOn.Resource.Name); + } + + pendingAnnotations[waitOn] = tcs; + + dependencies.Add(Wait()); + } + } + } + + await resourceNotificationService.PublishUpdateAsync(r, s => s with + { + State = new("Waiting", KnownResourceStateStyles.Info) + }); + + await Task.WhenAll(dependencies).WaitAsync(context.CancellationToken); + })); + } + + _ = Task.Run(async () => + { + var stoppingToken = _cts.Token; + + // These states are terminal but we need a better way to detect that + static bool IsKnownTerminalState(CustomResourceSnapshot snapshot) => + snapshot.State == "FailedToStart" || + snapshot.State == "Exited" || + snapshot.ExitCode is not null; + + // Watch for global resource state changes + await foreach (var resourceEvent in resourceNotificationService.WatchAsync(stoppingToken)) + { + if (waitingResources.TryGetValue(resourceEvent.Resource, out var pendingAnnotations)) + { + foreach (var (waitOn, tcs) in pendingAnnotations) + { + if (waitOn.States is string[] states && states.Contains(resourceEvent.Snapshot.State?.Text, StringComparer.Ordinal)) + { + pendingAnnotations.TryRemove(waitOn, out _); + + _ = DoTheHealthCheck(resourceEvent, tcs); + } + else if (waitOn.WaitUntilCompleted) + { + if (IsKnownTerminalState(resourceEvent.Snapshot)) + { + pendingAnnotations.TryRemove(waitOn, out _); + + _ = DoTheHealthCheck(resourceEvent, tcs); + } + } + else if (waitOn.States is null) + { + if (resourceEvent.Snapshot.State == "Running") + { + pendingAnnotations.TryRemove(waitOn, out _); + + _ = DoTheHealthCheck(resourceEvent, tcs); + } + else if (IsKnownTerminalState(resourceEvent.Snapshot)) + { + pendingAnnotations.TryRemove(waitOn, out _); + + tcs.TrySetException(new Exception($"Dependency {waitOn.Resource.Name} failed to start")); + } + } + } + } + } + }, + cancellationToken); + + return Task.CompletedTask; + } + + private async Task DoTheHealthCheck(ResourceEvent resourceEvent, TaskCompletionSource tcs) + { + var resource = resourceEvent.Resource; + + // REVIEW: Right now, every resource does an independent health check, we could instead cache + // the health check result and reuse it for all resources that depend on the same resource + + + HealthCheckAnnotation? healthCheckAnnotation = null; + + // Find the relevant health check annotation. If the resource has a parent, walk up the tree + // until we find the health check annotation. + while (true) + { + // If we find a health check annotation, break out of the loop + if (resource.TryGetLastAnnotation(out healthCheckAnnotation)) + { + break; + } + + // If the resource has a parent, walk up the tree + if (resource is IResourceWithParent parent) + { + resource = parent.Parent; + } + else + { + break; + } + } + + Func? operation = null; + + if (healthCheckAnnotation?.HealthCheckFactory is { } factory) + { + IHealthCheck? check; + + try + { + // TODO: Do need to pass a cancellation token here? + check = await factory(resource, default); + + if (check is not null) + { + var context = new HealthCheckContext() + { + Registration = new HealthCheckRegistration("", check, HealthStatus.Unhealthy, []) + }; + + operation = async (cancellationToken) => + { + var result = await check.CheckHealthAsync(context, cancellationToken); + + if (result.Exception is not null) + { + ExceptionDispatchInfo.Throw(result.Exception); + } + + if (result.Status != HealthStatus.Healthy) + { + throw new Exception("Health check failed"); + } + }; + } + } + catch (Exception ex) + { + tcs.TrySetException(ex); + + return; + } + } + + try + { + if (operation is not null) + { + var pipeline = CreateResiliencyPipeline(); + + await pipeline.ExecuteAsync(operation); + } + + tcs.TrySetResult(); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + } + + private static ResiliencePipeline CreateResiliencyPipeline() + { + var retryUntilCancelled = new RetryStrategyOptions() + { + ShouldHandle = new PredicateBuilder().Handle(), + BackoffType = DelayBackoffType.Exponential, + MaxRetryAttempts = 5, + UseJitter = true, + MaxDelay = TimeSpan.FromSeconds(30) + }; + + return new ResiliencePipelineBuilder().AddRetry(retryUntilCancelled).Build(); + } + + public ValueTask DisposeAsync() + { + _cts.Cancel(); + return default; + } + } +} diff --git a/src/Aspire/NCafe.AppHost/NCafe.AppHost.csproj b/src/Aspire/NCafe.AppHost/NCafe.AppHost.csproj index 4285a88..b2802ae 100644 --- a/src/Aspire/NCafe.AppHost/NCafe.AppHost.csproj +++ b/src/Aspire/NCafe.AppHost/NCafe.AppHost.csproj @@ -11,6 +11,8 @@ + + diff --git a/src/Aspire/NCafe.AppHost/Program.cs b/src/Aspire/NCafe.AppHost/Program.cs index d92fa1a..f068d2e 100644 --- a/src/Aspire/NCafe.AppHost/Program.cs +++ b/src/Aspire/NCafe.AppHost/Program.cs @@ -1,19 +1,28 @@ var builder = DistributedApplication.CreateBuilder(args); -var rabbitMq = builder.AddRabbitMQ("rabbitmq"); var eventStore = builder.AddEventStore("eventstore") + .WithHealthCheck() .WithDataVolume(); +var rabbitMq = builder.AddRabbitMQ("rabbitmq") + .WithHealthCheck() + .WithManagementPlugin(); + var adminProject = builder.AddProject("admin-api") - .WithReference(eventStore); + .WithReference(eventStore) + .WaitFor(eventStore); var baristaProject = builder.AddProject("barista-api") .WithReference(eventStore) - .WithReference(rabbitMq); + .WithReference(rabbitMq) + .WaitFor(eventStore) + .WaitFor(rabbitMq); var cashierProject = builder.AddProject("cashier-api") .WithReference(eventStore) - .WithReference(rabbitMq); + .WithReference(rabbitMq) + .WaitFor(eventStore) + .WaitFor(rabbitMq); var webUiProject = builder.AddProject("web-ui") .WithExternalHttpEndpoints(); diff --git a/src/Barista/NCafe.Barista.Api/Program.cs b/src/Barista/NCafe.Barista.Api/Program.cs index 173a666..6d04ccc 100644 --- a/src/Barista/NCafe.Barista.Api/Program.cs +++ b/src/Barista/NCafe.Barista.Api/Program.cs @@ -13,6 +13,8 @@ builder.AddServiceDefaults(); +builder.AddRabbitMQClient("rabbitmq"); + // Add services to the container. builder.Services.AddEventStoreRepository(builder.Configuration) .AddCommandHandlers() diff --git a/src/Cashier/NCafe.Cashier.Api/Program.cs b/src/Cashier/NCafe.Cashier.Api/Program.cs index 9f88ead..d7651ac 100644 --- a/src/Cashier/NCafe.Cashier.Api/Program.cs +++ b/src/Cashier/NCafe.Cashier.Api/Program.cs @@ -10,6 +10,8 @@ builder.AddServiceDefaults(); +builder.AddRabbitMQClient("rabbitmq"); + // Add services to the container. builder.Services.AddEventStoreRepository(builder.Configuration) .AddCommandHandlers() diff --git a/src/Common/NCafe.Infrastructure/NCafe.Infrastructure.csproj b/src/Common/NCafe.Infrastructure/NCafe.Infrastructure.csproj index dd8280f..6706b1e 100644 --- a/src/Common/NCafe.Infrastructure/NCafe.Infrastructure.csproj +++ b/src/Common/NCafe.Infrastructure/NCafe.Infrastructure.csproj @@ -6,6 +6,7 @@ +