Skip to content

Commit

Permalink
If eager join is set, clear scheduled activities on join (elsa-workfl…
Browse files Browse the repository at this point in the history
  • Loading branch information
martinhjulstrom authored Jul 11, 2022
1 parent 1f9742b commit 520e3d3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 6 deletions.
24 changes: 24 additions & 0 deletions src/core/Elsa.Core/Activities/ControlFlow/Join/Join.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected override async ValueTask<IActivityExecutionResult> OnExecuteAsync(Acti

await RemoveBlockingActivitiesAsync(workflowExecutionContext, owningFork);
await RemoveScopeActivitiesAsync(workflowExecutionContext, ancestors, owningFork);
RemoveScheduledActivitiesAsync(workflowExecutionContext, owningFork);

// Clear the recorded inbound transitions. This is necessary in case we're in a looping construct.
InboundTransitions = new List<string>();
Expand Down Expand Up @@ -125,6 +126,29 @@ private async Task RemoveBlockingActivitiesAsync(WorkflowExecutionContext workfl
}
}
}

private void RemoveScheduledActivitiesAsync(WorkflowExecutionContext workflowExecutionContext, IActivityBlueprint? fork)
{
if (!EagerJoin) return;

var scheduledActivities = workflowExecutionContext.WorkflowInstance.ScheduledActivities.ToList();

foreach (var scheduledActivity in scheduledActivities)
{
var blockingActivityBlueprint = workflowExecutionContext.WorkflowBlueprint.GetActivity(scheduledActivity.ActivityId)!;
var blockingActivityAncestors = workflowExecutionContext.GetInboundActivityPath(scheduledActivity.ActivityId).ToList();

// Include composite activities in the equation.
if (blockingActivityBlueprint.Parent != null)
{
var compositeBlockingActivityAncestors = workflowExecutionContext.GetInboundActivityPath(blockingActivityBlueprint.Parent.Id).ToList();
blockingActivityAncestors = blockingActivityAncestors.Concat(compositeBlockingActivityAncestors).ToList();
}

if (fork == null || blockingActivityAncestors.Contains(fork.Id))
workflowExecutionContext.WorkflowInstance.ScheduledActivities.Remove(scheduledActivity);
}
}

private async Task RemoveScopeActivitiesAsync(WorkflowExecutionContext workflowExecutionContext, ICollection<IActivityBlueprint> ancestors, IEnumerable<IActivityBlueprint> forks)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using Elsa.Activities.Console;
using Elsa.Activities.ControlFlow;
using Elsa.Builders;
Expand All @@ -8,16 +9,21 @@ namespace Elsa.Core.IntegrationTests.Workflows
public class ForkEagerJoinWorkflow : IWorkflow
{
private readonly bool _eagerJoin;

public ForkEagerJoinWorkflow(bool eagerJoin)
private readonly string[] _branches;

public ForkEagerJoinWorkflow(bool eagerJoin, bool reverseBranchOrder)
{
_eagerJoin = eagerJoin;

_branches = new[] { "Branch 1", "Branch 2" };

if (reverseBranchOrder) _branches = _branches.Reverse().ToArray();
}

public void Build(IWorkflowBuilder builder)
{
builder.StartWith<Fork>(
activity => activity.Set(x => x.Branches, new HashSet<string>(new[] { "Branch 1", "Branch 2" })),
activity => activity.Set(x => x.Branches, new HashSet<string>(_branches)),
fork =>
{
fork.When("Branch 1").SignalReceived("Signal1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,46 @@ async Task<bool> GetActivityHasExecutedAsync(string name)
[Fact(DisplayName = "Normal join should not eagerly clear blocking activities")]
public async Task Test03()
{
var workflow = new ForkEagerJoinWorkflow(false);
var workflow = new ForkEagerJoinWorkflow(false, false);
var workflowBlueprint = WorkflowBuilder.Build(workflow);
var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint);
var workflowInstance = workflowResult.WorkflowInstance!;

Assert.Equal(2, workflowInstance.BlockingActivities.Count);
}

[Fact(DisplayName = "Eager join should not clear all blocking activities")]
[Fact(DisplayName = "Eager join should clear all blocking activities")]
public async Task Test04()
{
var workflow = new ForkEagerJoinWorkflow(true);
var workflow = new ForkEagerJoinWorkflow(true, false);
var workflowBlueprint = WorkflowBuilder.Build(workflow);
var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint);
var workflowInstance = workflowResult.WorkflowInstance!;

Assert.Single(workflowInstance.BlockingActivities);
}

[Fact(DisplayName = "Eager join should clear blocking and scheduled activities")]
public async Task Test05()
{
var workflow = new ForkEagerJoinWorkflow(true, true);
var workflowBlueprint = WorkflowBuilder.Build(workflow);
var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint);
var workflowInstance = workflowResult.WorkflowInstance!;

Assert.Single(workflowInstance.BlockingActivities);
}

[Fact(DisplayName = "Normal join should not clear scheduled activities")]
public async Task Test06()
{
var workflow = new ForkEagerJoinWorkflow(false, true);
var workflowBlueprint = WorkflowBuilder.Build(workflow);
var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint);
var workflowInstance = workflowResult.WorkflowInstance!;

Assert.Equal(2, workflowInstance.BlockingActivities.Count);
}

private async Task<WorkflowInstance> TriggerSignalAsync(IWorkflowBlueprint workflowBlueprint, WorkflowInstance workflowInstance, string signal)
{
Expand Down
27 changes: 27 additions & 0 deletions test/unit/Elsa.UnitTests/Models/SimpleStackTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Xunit;

namespace Elsa.Models;

public class SimpleStackTests
{
[Fact(DisplayName = "Testing simple stack push, pop, peek, and remove operations")]
public void PushPopAndRemove()
{
var sut = new SimpleStack<string>();

sut.Push("1");
sut.Push("2");
sut.Push("3");
sut.Push("4");

Assert.Equal("4", sut.Peek());
Assert.Equal("4", sut.Peek());
Assert.Equal("4", sut.Pop());
Assert.Equal("3", sut.Peek());

sut.Remove("2");

Assert.Equal("3", sut.Pop());
Assert.Equal("1", sut.Pop());
}
}

0 comments on commit 520e3d3

Please sign in to comment.