From 520e3d3cd165c9b80042b839479265ea0e020a21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Hjulstr=C3=B6m?= <51920152+martinhjulstrom@users.noreply.github.com> Date: Mon, 11 Jul 2022 11:55:35 +0200 Subject: [PATCH] If eager join is set, clear scheduled activities on join (#3177) --- .../Activities/ControlFlow/Join/Join.cs | 24 ++++++++++++++++ .../Workflows/ForkEagerJoinWorkflow.cs | 12 ++++++-- .../Workflows/ForkJoinWorkflowTests.cs | 28 +++++++++++++++++-- .../Elsa.UnitTests/Models/SimpleStackTests.cs | 27 ++++++++++++++++++ 4 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 test/unit/Elsa.UnitTests/Models/SimpleStackTests.cs diff --git a/src/core/Elsa.Core/Activities/ControlFlow/Join/Join.cs b/src/core/Elsa.Core/Activities/ControlFlow/Join/Join.cs index f5bad88a43..82a4f2ba26 100644 --- a/src/core/Elsa.Core/Activities/ControlFlow/Join/Join.cs +++ b/src/core/Elsa.Core/Activities/ControlFlow/Join/Join.cs @@ -71,6 +71,7 @@ protected override async ValueTask OnExecuteAsync(Acti await RemoveBlockingActivitiesAsync(workflowExecutionContext, owningFork); await RemoveScopeActivitiesAsync(workflowExecutionContext, ancestors, owningFork); + RemoveScheduledActivitiesAsync(workflowExecutionContext, owningFork); // Clear the recorded inbound transitions. This is necessary in case we're in a looping construct. InboundTransitions = new List(); @@ -125,6 +126,29 @@ private async Task RemoveBlockingActivitiesAsync(WorkflowExecutionContext workfl } } } + + private void RemoveScheduledActivitiesAsync(WorkflowExecutionContext workflowExecutionContext, IActivityBlueprint? fork) + { + if (!EagerJoin) return; + + var scheduledActivities = workflowExecutionContext.WorkflowInstance.ScheduledActivities.ToList(); + + foreach (var scheduledActivity in scheduledActivities) + { + var blockingActivityBlueprint = workflowExecutionContext.WorkflowBlueprint.GetActivity(scheduledActivity.ActivityId)!; + var blockingActivityAncestors = workflowExecutionContext.GetInboundActivityPath(scheduledActivity.ActivityId).ToList(); + + // Include composite activities in the equation. + if (blockingActivityBlueprint.Parent != null) + { + var compositeBlockingActivityAncestors = workflowExecutionContext.GetInboundActivityPath(blockingActivityBlueprint.Parent.Id).ToList(); + blockingActivityAncestors = blockingActivityAncestors.Concat(compositeBlockingActivityAncestors).ToList(); + } + + if (fork == null || blockingActivityAncestors.Contains(fork.Id)) + workflowExecutionContext.WorkflowInstance.ScheduledActivities.Remove(scheduledActivity); + } + } private async Task RemoveScopeActivitiesAsync(WorkflowExecutionContext workflowExecutionContext, ICollection ancestors, IEnumerable forks) { diff --git a/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkEagerJoinWorkflow.cs b/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkEagerJoinWorkflow.cs index d246ffb4cd..776c52f85e 100644 --- a/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkEagerJoinWorkflow.cs +++ b/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkEagerJoinWorkflow.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Linq; using Elsa.Activities.Console; using Elsa.Activities.ControlFlow; using Elsa.Builders; @@ -8,16 +9,21 @@ namespace Elsa.Core.IntegrationTests.Workflows public class ForkEagerJoinWorkflow : IWorkflow { private readonly bool _eagerJoin; - - public ForkEagerJoinWorkflow(bool eagerJoin) + private readonly string[] _branches; + + public ForkEagerJoinWorkflow(bool eagerJoin, bool reverseBranchOrder) { _eagerJoin = eagerJoin; + + _branches = new[] { "Branch 1", "Branch 2" }; + + if (reverseBranchOrder) _branches = _branches.Reverse().ToArray(); } public void Build(IWorkflowBuilder builder) { builder.StartWith( - activity => activity.Set(x => x.Branches, new HashSet(new[] { "Branch 1", "Branch 2" })), + activity => activity.Set(x => x.Branches, new HashSet(_branches)), fork => { fork.When("Branch 1").SignalReceived("Signal1"); diff --git a/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkJoinWorkflowTests.cs b/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkJoinWorkflowTests.cs index 397c89d793..9658b2c66f 100644 --- a/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkJoinWorkflowTests.cs +++ b/test/integration/Elsa.Core.IntegrationTests/Workflows/ForkJoinWorkflowTests.cs @@ -91,7 +91,7 @@ async Task GetActivityHasExecutedAsync(string name) [Fact(DisplayName = "Normal join should not eagerly clear blocking activities")] public async Task Test03() { - var workflow = new ForkEagerJoinWorkflow(false); + var workflow = new ForkEagerJoinWorkflow(false, false); var workflowBlueprint = WorkflowBuilder.Build(workflow); var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint); var workflowInstance = workflowResult.WorkflowInstance!; @@ -99,16 +99,38 @@ public async Task Test03() Assert.Equal(2, workflowInstance.BlockingActivities.Count); } - [Fact(DisplayName = "Eager join should not clear all blocking activities")] + [Fact(DisplayName = "Eager join should clear all blocking activities")] public async Task Test04() { - var workflow = new ForkEagerJoinWorkflow(true); + var workflow = new ForkEagerJoinWorkflow(true, false); var workflowBlueprint = WorkflowBuilder.Build(workflow); var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint); var workflowInstance = workflowResult.WorkflowInstance!; Assert.Single(workflowInstance.BlockingActivities); } + + [Fact(DisplayName = "Eager join should clear blocking and scheduled activities")] + public async Task Test05() + { + var workflow = new ForkEagerJoinWorkflow(true, true); + var workflowBlueprint = WorkflowBuilder.Build(workflow); + var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint); + var workflowInstance = workflowResult.WorkflowInstance!; + + Assert.Single(workflowInstance.BlockingActivities); + } + + [Fact(DisplayName = "Normal join should not clear scheduled activities")] + public async Task Test06() + { + var workflow = new ForkEagerJoinWorkflow(false, true); + var workflowBlueprint = WorkflowBuilder.Build(workflow); + var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint); + var workflowInstance = workflowResult.WorkflowInstance!; + + Assert.Equal(2, workflowInstance.BlockingActivities.Count); + } private async Task TriggerSignalAsync(IWorkflowBlueprint workflowBlueprint, WorkflowInstance workflowInstance, string signal) { diff --git a/test/unit/Elsa.UnitTests/Models/SimpleStackTests.cs b/test/unit/Elsa.UnitTests/Models/SimpleStackTests.cs new file mode 100644 index 0000000000..163a0bf510 --- /dev/null +++ b/test/unit/Elsa.UnitTests/Models/SimpleStackTests.cs @@ -0,0 +1,27 @@ +using Xunit; + +namespace Elsa.Models; + +public class SimpleStackTests +{ + [Fact(DisplayName = "Testing simple stack push, pop, peek, and remove operations")] + public void PushPopAndRemove() + { + var sut = new SimpleStack(); + + sut.Push("1"); + sut.Push("2"); + sut.Push("3"); + sut.Push("4"); + + Assert.Equal("4", sut.Peek()); + Assert.Equal("4", sut.Peek()); + Assert.Equal("4", sut.Pop()); + Assert.Equal("3", sut.Peek()); + + sut.Remove("2"); + + Assert.Equal("3", sut.Pop()); + Assert.Equal("1", sut.Pop()); + } +} \ No newline at end of file