Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sfmskywalker committed Jul 3, 2022
2 parents 208510a + d855ec3 commit 8f3adad
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ private async Task AddAttachmentsAsync(BodyBuilder bodyBuilder, CancellationToke

if (attachments != null)
{
if (attachments is string && string.IsNullOrWhiteSpace((string)attachments))
return;

var index = 0;
var attachmentObjects = InterpretAttachmentsModel(attachments);

Expand Down Expand Up @@ -206,4 +209,4 @@ private async Task<HttpResponseMessage> DownloadUrlAsync(Uri url)
return response;
}
}
}
}
21 changes: 16 additions & 5 deletions src/core/Elsa.Core/Activities/ControlFlow/Join/Join.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public enum JoinMode
WaitAll,
WaitAny
}

[ActivityInput(Hint = "True if all blocking activities within the fork should be cleared.", UIHint = ActivityInputUIHints.SingleLine, SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
public bool EagerJoin { get; set; }

[ActivityInput(
UIHint = ActivityInputUIHints.Dropdown,
Expand Down Expand Up @@ -106,12 +109,20 @@ private async Task RemoveBlockingActivitiesAsync(WorkflowExecutionContext workfl
blockingActivityAncestors = blockingActivityAncestors.Concat(compositeBlockingActivityAncestors).ToList();
}

// If the fork is inbound in the blocking activity AND the blocking activity is inbound in this Join, then clear it.
var blockingActivityHasInboundFork = fork == null || blockingActivityAncestors.Contains(fork.Id);
var joinActivityHasInboundBlockingActivity = inboundActivities.Contains(blockingActivity.ActivityId);
if (EagerJoin)
{
if (fork == null || blockingActivityAncestors.Contains(fork.Id))
await workflowExecutionContext.RemoveBlockingActivityAsync(blockingActivity);
}
else
{
// If the fork is inbound in the blocking activity AND the blocking activity is inbound in this Join, then clear it.
var blockingActivityHasInboundFork = fork == null || blockingActivityAncestors.Contains(fork.Id);
var joinActivityHasInboundBlockingActivity = inboundActivities.Contains(blockingActivity.ActivityId);

if (blockingActivityHasInboundFork && joinActivityHasInboundBlockingActivity)
await workflowExecutionContext.RemoveBlockingActivityAsync(blockingActivity);
if (blockingActivityHasInboundFork && joinActivityHasInboundBlockingActivity)
await workflowExecutionContext.RemoveBlockingActivityAsync(blockingActivity);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
using Elsa.Models;
using Elsa.Persistence.MongoDb.Serializers;
using Elsa.Services.Models;
using Microsoft.Extensions.Logging;
using MongoDb.Bson.NodaTime;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;

namespace Elsa.Persistence.MongoDb.Services
{
public static class DatabaseRegister
{
public static void RegisterMapsAndSerializers()
public static void RegisterMapsAndSerializers(ILogger logger = null)
{
// In unit tests, the method is called several times, which throws an exception because the entity is already registered
// If an error is thrown, the remaining registrations are no longer processed
Expand All @@ -18,7 +20,7 @@ public static void RegisterMapsAndSerializers()
if (firstPass == false)
return;

RegisterSerializers();
RegisterSerializers(logger);
}

private static bool Map()
Expand All @@ -33,19 +35,19 @@ private static bool Map()
cm.SetIsRootClass(true);
cm.MapIdProperty(x => x.Id);
});

BsonClassMap.RegisterClassMap<WorkflowDefinition>(cm =>
{
cm.MapProperty(p => p.Variables).SetSerializer(VariablesSerializer.Instance);
cm.AutoMap();
});

BsonClassMap.RegisterClassMap<WorkflowInstance>(cm =>
{
cm.MapProperty(p => p.Variables).SetSerializer(VariablesSerializer.Instance);
cm.AutoMap();
});

BsonClassMap.RegisterClassMap<Bookmark>(cm => cm.AutoMap());
BsonClassMap.RegisterClassMap<WorkflowExecutionLogRecord>(cm => cm.AutoMap());
BsonClassMap.RegisterClassMap<WorkflowOutputReference>(cm => cm.AutoMap());
Expand All @@ -58,18 +60,52 @@ private static bool Map()
return true;
}

private static void RegisterSerializers()
private static void RegisterSerializers(ILogger logger = null)
{
if (BsonSerializer.LookupSerializer<VariablesSerializer>() == null)
try
{
BsonSerializer.RegisterSerializer(VariablesSerializer.Instance);
if (BsonSerializer.LookupSerializer<JObjectSerializer>() == null)
}
catch (BsonSerializationException ex)
{
logger?.LogWarning(ex, "Couldn't register {serializer_name}", nameof(VariablesSerializer));
}

try
{
BsonSerializer.RegisterSerializer(JObjectSerializer.Instance);
if (BsonSerializer.LookupSerializer<ObjectSerializer>() == null)
}
catch (BsonSerializationException ex)
{
logger?.LogWarning(ex, "Couldn't register {serializer_name}", nameof(JObjectSerializer));
}

try
{
BsonSerializer.RegisterSerializer(ObjectSerializer.Instance);
if (BsonSerializer.LookupSerializer<TypeSerializer>() == null)
}
catch (BsonSerializationException ex)
{
logger?.LogWarning(ex, "Couldn't register {serializer_name}", nameof(ObjectSerializer));
}

try
{
BsonSerializer.RegisterSerializer(TypeSerializer.Instance);
if (BsonSerializer.LookupSerializer<InstantSerializer>() == null)
}
catch (BsonSerializationException ex)
{
logger?.LogWarning(ex, "Couldn't register {serializer_name}", nameof(TypeSerializer));
}

try
{
BsonSerializer.RegisterSerializer(new InstantSerializer());
}
catch (BsonSerializationException ex)
{
logger?.LogWarning(ex, "Couldn't register {serializer_name}", nameof(InstantSerializer));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Collections.Generic;
using Elsa.Activities.Console;
using Elsa.Activities.ControlFlow;
using Elsa.Builders;

namespace Elsa.Core.IntegrationTests.Workflows
{
public class ForkEagerJoinWorkflow : IWorkflow
{
private readonly bool _eagerJoin;

public ForkEagerJoinWorkflow(bool eagerJoin)
{
_eagerJoin = eagerJoin;
}

public void Build(IWorkflowBuilder builder)
{
builder.StartWith<Fork>(
activity => activity.Set(x => x.Branches, new HashSet<string>(new[] { "Branch 1", "Branch 2" })),
fork =>
{
fork.When("Branch 1").SignalReceived("Signal1");
fork.When("Branch 2").ThenNamed("Join");
})
.Add<Join>(join => join.Set(x => x.Mode, Join.JoinMode.WaitAny).Set(x => x.EagerJoin, _eagerJoin)).WithName("Join")
.SignalReceived("Signal3")
.WriteLine("Finished");
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,28 @@ async Task<bool> GetActivityHasExecutedAsync(string name)
Assert.Equal(WorkflowStatus.Finished, workflowInstance.WorkflowStatus);
Assert.True(await GetIsFinishedAsync());
}

[Fact(DisplayName = "Normal join should not eagerly clear blocking activities")]
public async Task Test03()
{
var workflow = new ForkEagerJoinWorkflow(false);
var workflowBlueprint = WorkflowBuilder.Build(workflow);
var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint);
var workflowInstance = workflowResult.WorkflowInstance!;

Assert.Equal(2, workflowInstance.BlockingActivities.Count);
}

[Fact(DisplayName = "Eager join should not clear all blocking activities")]
public async Task Test04()
{
var workflow = new ForkEagerJoinWorkflow(true);
var workflowBlueprint = WorkflowBuilder.Build(workflow);
var workflowResult = await WorkflowStarter.StartWorkflowAsync(workflowBlueprint);
var workflowInstance = workflowResult.WorkflowInstance!;

Assert.Single(workflowInstance.BlockingActivities);
}

private async Task<WorkflowInstance> TriggerSignalAsync(IWorkflowBlueprint workflowBlueprint, WorkflowInstance workflowInstance, string signal)
{
Expand Down

0 comments on commit 8f3adad

Please sign in to comment.