Skip to content

Commit

Permalink
Rework local activity skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 23, 2023
1 parent 96b23b4 commit da79d39
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,8 @@ public WorkflowTaskResult handleWorkflowTask(
}

handleWorkflowTaskImpl(workflowTask, historyIterator);
if (!context.isWorkflowMethodCompleted()) {
processLocalActivityRequests(wftHearbeatDeadline);
}
processLocalActivityRequests(wftHearbeatDeadline);

List<Command> commands = workflowStateMachines.takeCommands();
List<Message> messages = workflowStateMachines.takeMessages();
EnumSet<SdkFlag> newFlags = workflowStateMachines.takeNewSdkFlags();
Expand Down Expand Up @@ -313,6 +312,12 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
throws InterruptedException, Throwable {

while (true) {
// Scheduling or handling any local activities after the workflow method has returned
// can result in commands being generated after the CompleteWorkflowExecution command
// which the server does not allow.
if (context.isWorkflowMethodCompleted()) {
break;
}
List<ExecuteLocalActivityParameters> laRequests =
workflowStateMachines.takeLocalActivityRequests();
localActivityTaskCount += laRequests.size();
Expand Down Expand Up @@ -363,7 +368,8 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
// it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's
// not empty - we are in trouble anyway
Preconditions.checkState(
workflowStateMachines.takeLocalActivityRequests().isEmpty(),
workflowStateMachines.takeLocalActivityRequests().isEmpty()
|| context.isWorkflowMethodCompleted(),
"[BUG] Local activities requests from the last event loop were not drained "
+ "and accounted in the outstanding local activities counter");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public String execute() {
Workflow.mutableSideEffect(
"id1", Integer.class, (o, n) -> Objects.equals(n, o), () -> 0);
});

return "done";
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@

import io.temporal.activity.LocalActivityOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Async;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnitParamsRunner.class)
public class LocalActivityInTheLastWorkflowTaskTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand All @@ -41,13 +43,19 @@ public class LocalActivityInTheLastWorkflowTaskTest {
.build();

@Test
public void testLocalActivityInTheLastWorkflowTask() {
TestWorkflows.TestWorkflowReturnString client =
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
assertEquals("done", client.execute());
@Parameters({"true", "false"})
public void testLocalActivityInTheLastWorkflowTask(boolean blockOnLA) {
TestWorkflow client = testWorkflowRule.newWorkflowStub(TestWorkflow.class);
assertEquals("done", client.execute(blockOnLA));
}

public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
@WorkflowInterface
public interface TestWorkflow {
@WorkflowMethod
String execute(boolean blockOnLA);
}

public static class TestWorkflowImpl implements TestWorkflow {

private final TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(
Expand All @@ -57,7 +65,12 @@ public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturn
.build());

@Override
public String execute() {
public String execute(boolean blockOnLA) {
if (blockOnLA) {
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
Async.procedure(activities::sleepActivity, (long) 1000, 0);
promise.get();
}
Async.procedure(activities::sleepActivity, (long) 1000, 0);
return "done";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import io.temporal.activity.LocalActivityOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnitParamsRunner.class)
public class SignalWithLocalActivityInTheLastWorkflowTaskTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
Expand All @@ -41,15 +44,25 @@ public class SignalWithLocalActivityInTheLastWorkflowTaskTest {
.build();

@Test
public void testSignalWithLocalActivityInTheLastWorkflowTask() {
TestWorkflows.TestSignaledWorkflow client =
testWorkflowRule.newWorkflowStub(TestWorkflows.TestSignaledWorkflow.class);
@Parameters({"true", "false"})
public void testSignalWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA) {
TestSignaledWorkflow client = testWorkflowRule.newWorkflowStub(TestSignaledWorkflow.class);
WorkflowStub.fromTyped(client)
.signalWithStart("testSignal", new String[] {"signalValue"}, new String[] {});
.signalWithStart("testSignal", new String[] {"signalValue"}, new Boolean[] {waitOnLA});
assertEquals("done", client.execute());
}

public static class TestSignalWorkflowImpl implements TestWorkflows.TestSignaledWorkflow {
@WorkflowInterface
public interface TestSignaledWorkflow {

@WorkflowMethod
String execute();

@SignalMethod
void signal(boolean waitOnLA);
}

public static class TestSignalWorkflowImpl implements TestSignaledWorkflow {

private final TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(
Expand All @@ -64,7 +77,13 @@ public String execute() {
}

@Override
public void signal(String arg) {
public void signal(boolean waitOnLA) {
if (waitOnLA) {
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
Async.procedure(activities::sleepActivity, (long) 1000, 0);
promise.get();
}

activities.sleepActivity(1000, 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,36 @@
import io.temporal.activity.LocalActivityOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnitParamsRunner.class)
public class UpdateWithLocalActivityInTheLastWorkflowTaskTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestSimpleWorkflowWithUpdateImpl.class)
.setWorkflowTypes(WorkflowWithUpdateImpl.class)
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
.build();

@Test
public void testUpdateWithLocalActivityInTheLastWorkflowTask() throws InterruptedException {
TestWorkflows.SimpleWorkflowWithUpdate client =
testWorkflowRule.newWorkflowStub(TestWorkflows.SimpleWorkflowWithUpdate.class);
@Parameters({"true", "false"})
public void testUpdateWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA)
throws InterruptedException {
WorkflowWithUpdate client = testWorkflowRule.newWorkflowStub(WorkflowWithUpdate.class);

WorkflowStub.fromTyped(client).start();
Thread asyncUpdate =
new Thread(
() -> {
try {
System.out.println("Sending update");
client.update("Update");
client.update(waitOnLA);
} catch (Exception e) {
}
});
Expand All @@ -60,8 +63,17 @@ public void testUpdateWithLocalActivityInTheLastWorkflowTask() throws Interrupte
asyncUpdate.interrupt();
}

public static class TestSimpleWorkflowWithUpdateImpl
implements TestWorkflows.SimpleWorkflowWithUpdate {
@WorkflowInterface
public interface WorkflowWithUpdate {

@WorkflowMethod
String execute();

@UpdateMethod
String update(Boolean waitOnLA);
}

public static class WorkflowWithUpdateImpl implements WorkflowWithUpdate {
Boolean finish = false;

private final TestActivities.VariousTestActivities activities =
Expand All @@ -78,7 +90,12 @@ public String execute() {
}

@Override
public String update(String value) {
public String update(Boolean waitOnLA) {
if (waitOnLA) {
Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0);
Async.procedure(activities::sleepActivity, (long) 1000, 0);
promise.get();
}
finish = true;
activities.sleepActivity(1000, 0);
return "update";
Expand Down

0 comments on commit da79d39

Please sign in to comment.