Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 5, 2024
1 parent 48092ae commit ccd398e
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ boolean getVersion(
/**
* @return eventId of the last / currently active workflow task of this workflow
*/
long getCurrentWorkflowTaskStartedEventId();
long getLastWorkflowTaskStartedEventId();

/**
* @return size of Workflow history in bytes up until the current moment of execution. This value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
public Optional<String> getCurrentBuildId() {
String curTaskBID = workflowStateMachines.getCurrentTaskBuildId();
// The current task started id == 0 check is to avoid setting the build id to this worker's ID
// in the
// event we're servicing a query, in which case we do want to use the ID from history.
if (!workflowStateMachines.isReplaying() && getCurrentWorkflowTaskStartedEventId() != 0) {
// in the event we're
// servicing a query, in which case we do want to use the ID from history.
if (!workflowStateMachines.isReplaying()
&& workflowStateMachines.getCurrentWFTStartedEventId() != 0) {
curTaskBID = workerOptions.getBuildId();
}
return Optional.ofNullable(curTaskBID);
Expand Down Expand Up @@ -368,8 +369,8 @@ public Map<String, Payload> getHeader() {
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
return workflowStateMachines.getCurrentStartedEventId();
public long getLastWorkflowTaskStartedEventId() {
return workflowStateMachines.getLastWFTStartedEventId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public WorkflowTaskResult handleWorkflowTask(
TimeUnit.NANOSECONDS);

if (workflowTask.getPreviousStartedEventId()
< workflowStateMachines.getCurrentStartedEventId()) {
< workflowStateMachines.getLastWFTStartedEventId()) {
// if previousStartedEventId < currentStartedEventId - the last workflow task handled by
// these state machines is ahead of the last handled workflow task known by the server.
// Something is off, the server lost progress.
Expand Down Expand Up @@ -219,7 +219,7 @@ public QueryResult handleDirectQueryWorkflowTask(

@Override
public void setCurrentStartedEvenId(Long eventId) {
workflowStateMachines.setCurrentStartedEventId(eventId);
workflowStateMachines.setLastWFTStartedEventId(eventId);
}

private void handleWorkflowTaskImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum HandleEventStatus {
private long workflowTaskStartedEventId;

/** EventId of the last WorkflowTaskStarted event handled by these state machines. */
private long currentStartedEventId;
private long lastWFTStartedEventId;

/** The Build ID used in the current WFT if already completed and set (may be null) */
private String currentTaskBuildId;
Expand Down Expand Up @@ -205,21 +205,25 @@ public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
this.workflowTaskStartedEventId = workflowTaskStartedEventId;
}

public void setCurrentStartedEventId(long eventId) {
public void setLastWFTStartedEventId(long eventId) {
// We have to drop any state machines (which should only be one workflow task machine)
// created when handling the speculative workflow task
for (long i = this.lastHandledEventId; i > eventId; i--) {
stateMachines.remove(i);
}
this.currentStartedEventId = eventId;
this.lastWFTStartedEventId = eventId;
// When we reset the event ID on a speculative WFT we need to move this counter back
// to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
// always follows the WFT started.
this.lastHandledEventId = eventId + 1;
}

public long getCurrentStartedEventId() {
return currentStartedEventId;
public long getLastWFTStartedEventId() {
return lastWFTStartedEventId;
}

public long getCurrentWFTStartedEventId() {
return workflowTaskStartedEventId;
}

public long getHistorySize() {
Expand Down Expand Up @@ -716,7 +720,7 @@ private long setCurrentTimeMillis(long currentTimeMillis) {
}

public long getLastStartedEventId() {
return currentStartedEventId;
return lastWFTStartedEventId;
}

/**
Expand Down Expand Up @@ -1177,7 +1181,7 @@ public void workflowTaskStarted(
value.nonReplayWorkflowTaskStarted();
}
}
WorkflowStateMachines.this.currentStartedEventId = startedEventId;
WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
WorkflowStateMachines.this.historySize = historySize;
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;

Expand Down Expand Up @@ -1295,6 +1299,6 @@ private String createEventHandlingMessage(HistoryEvent event) {
private String createShortCurrentStateMessagePostfix() {
return String.format(
"{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
this.workflowTaskStartedEventId, this.currentStartedEventId);
this.workflowTaskStartedEventId, this.lastWFTStartedEventId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public String getCronSchedule() {

@Override
public long getHistoryLength() {
return context.getCurrentWorkflowTaskStartedEventId();
return context.getLastWorkflowTaskStartedEventId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.BuildIdOperation;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
Expand All @@ -45,7 +46,6 @@ public class BuildIdVersioningTest {
SDKTestWorkflowRule.newBuilder()
.setWorkerOptions(
WorkerOptions.newBuilder().setBuildId("1.0").setUseBuildIdForVersioning(true).build())
.setWorkflowTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class)
.setActivityImplementations(new BuildIdVersioningTest.ActivityImpl())
.setDoNotStart(true)
.build();
Expand All @@ -57,6 +57,10 @@ public void testBuildIdVersioningDataSetProperly() {

String taskQueue = testWorkflowRule.getTaskQueue();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
testWorkflowRule
.getWorker()
.registerWorkflowImplementationTypes(
BuildIdVersioningTest.TestVersioningWorkflowImpl.class);

// Add 1.0 to the queue
workflowClient.updateWorkerBuildIdCompatability(
Expand Down Expand Up @@ -129,6 +133,10 @@ public void testCurrentBuildIDSetProperly() throws InterruptedException {

String taskQueue = testWorkflowRule.getTaskQueue();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
testWorkflowRule
.getWorker()
.registerWorkflowImplementationTypes(
BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class);

// Add 1.0 to the queue
workflowClient.updateWorkerBuildIdCompatability(
Expand All @@ -152,7 +160,16 @@ public void testCurrentBuildIDSetProperly() throws InterruptedException {
// Wait for activity to run
ACTIVITY_RAN.waitForSignal();
Assert.assertEquals("1.0", wf1.getState());

testWorkflowRule.getTestEnvironment().shutdown();
workflowClient
.getWorkflowServiceStubs()
.blockingStub()
.resetStickyTaskQueue(
io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest.newBuilder()
.setNamespace(testWorkflowRule.getTestEnvironment().getNamespace())
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId).build())
.build());

// Add 1.1 to the queue
workflowClient.updateWorkerBuildIdCompatability(
Expand All @@ -164,7 +181,7 @@ public void testCurrentBuildIDSetProperly() throws InterruptedException {
w11F.newWorker(
taskQueue,
WorkerOptions.newBuilder().setBuildId("1.1").setUseBuildIdForVersioning(true).build());
w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class);
w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class);
w11.registerActivitiesImplementations(new BuildIdVersioningTest.ActivityImpl());
w11F.start();

Expand Down Expand Up @@ -219,26 +236,36 @@ public static class TestCurrentBuildIdWorkflow implements TestWorkflows.Queryabl
TestActivities.TestActivity1.class,
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build());
private boolean doFinish = false;
private String lastBuildId;

@WorkflowMethod
public String execute() {
updateBuildId();
Workflow.sleep(1);
updateBuildId();
if (Workflow.getInfo().getCurrentBuildId().orElse("").equals("1.0")) {
activity.execute("foo");
updateBuildId();
ACTIVITY_RAN.signal();
}
Workflow.await(() -> doFinish);
updateBuildId();
return "Yay done";
}

private void updateBuildId() {
lastBuildId = Workflow.getInfo().getCurrentBuildId().orElse("");
}

@Override
public void mySignal(String arg) {
doFinish = true;
}

@Override
public String getState() {
return Workflow.getInfo().getCurrentBuildId().orElse("");
// Workflow.getInfo isn't accessible in queries, so we do this
return lastBuildId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public Map<String, Payload> getHeader() {
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
public long getLastWorkflowTaskStartedEventId() {
return 0;
}

Expand Down

0 comments on commit ccd398e

Please sign in to comment.