From ccd398ebf20806806d1a86154abcc2c11e8e5b25 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Jan 2024 18:05:20 -0800 Subject: [PATCH] Fixes --- .../replay/ReplayWorkflowContext.java | 2 +- .../replay/ReplayWorkflowContextImpl.java | 11 ++++--- .../replay/ReplayWorkflowRunTaskHandler.java | 4 +-- .../statemachines/WorkflowStateMachines.java | 20 ++++++----- .../internal/sync/WorkflowInfoImpl.java | 2 +- .../worker/BuildIdVersioningTest.java | 33 +++++++++++++++++-- .../sync/DummySyncWorkflowContext.java | 2 +- 7 files changed, 53 insertions(+), 21 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 774a02884..fbe6a7103 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -338,7 +338,7 @@ boolean getVersion( /** * @return eventId of the last / currently active workflow task of this workflow */ - long getCurrentWorkflowTaskStartedEventId(); + long getLastWorkflowTaskStartedEventId(); /** * @return size of Workflow history in bytes up until the current moment of execution. This value diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 3b798aaca..61e8e964d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -257,9 +257,10 @@ public boolean tryUseSdkFlag(SdkFlag flag) { public Optional getCurrentBuildId() { String curTaskBID = workflowStateMachines.getCurrentTaskBuildId(); // The current task started id == 0 check is to avoid setting the build id to this worker's ID - // in the - // event we're servicing a query, in which case we do want to use the ID from history. - if (!workflowStateMachines.isReplaying() && getCurrentWorkflowTaskStartedEventId() != 0) { + // in the event we're + // servicing a query, in which case we do want to use the ID from history. + if (!workflowStateMachines.isReplaying() + && workflowStateMachines.getCurrentWFTStartedEventId() != 0) { curTaskBID = workerOptions.getBuildId(); } return Optional.ofNullable(curTaskBID); @@ -368,8 +369,8 @@ public Map getHeader() { } @Override - public long getCurrentWorkflowTaskStartedEventId() { - return workflowStateMachines.getCurrentStartedEventId(); + public long getLastWorkflowTaskStartedEventId() { + return workflowStateMachines.getLastWFTStartedEventId(); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index facd909a3..01c0448a9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -150,7 +150,7 @@ public WorkflowTaskResult handleWorkflowTask( TimeUnit.NANOSECONDS); if (workflowTask.getPreviousStartedEventId() - < workflowStateMachines.getCurrentStartedEventId()) { + < workflowStateMachines.getLastWFTStartedEventId()) { // if previousStartedEventId < currentStartedEventId - the last workflow task handled by // these state machines is ahead of the last handled workflow task known by the server. // Something is off, the server lost progress. @@ -219,7 +219,7 @@ public QueryResult handleDirectQueryWorkflowTask( @Override public void setCurrentStartedEvenId(Long eventId) { - workflowStateMachines.setCurrentStartedEventId(eventId); + workflowStateMachines.setLastWFTStartedEventId(eventId); } private void handleWorkflowTaskImpl( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 9f0628960..aa42f0285 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -80,7 +80,7 @@ enum HandleEventStatus { private long workflowTaskStartedEventId; /** EventId of the last WorkflowTaskStarted event handled by these state machines. */ - private long currentStartedEventId; + private long lastWFTStartedEventId; /** The Build ID used in the current WFT if already completed and set (may be null) */ private String currentTaskBuildId; @@ -205,21 +205,25 @@ public void setWorklfowStartedEventId(long workflowTaskStartedEventId) { this.workflowTaskStartedEventId = workflowTaskStartedEventId; } - public void setCurrentStartedEventId(long eventId) { + public void setLastWFTStartedEventId(long eventId) { // We have to drop any state machines (which should only be one workflow task machine) // created when handling the speculative workflow task for (long i = this.lastHandledEventId; i > eventId; i--) { stateMachines.remove(i); } - this.currentStartedEventId = eventId; + this.lastWFTStartedEventId = eventId; // When we reset the event ID on a speculative WFT we need to move this counter back // to the last WFT completed to allow new tasks to be processed. Assume the WFT complete // always follows the WFT started. this.lastHandledEventId = eventId + 1; } - public long getCurrentStartedEventId() { - return currentStartedEventId; + public long getLastWFTStartedEventId() { + return lastWFTStartedEventId; + } + + public long getCurrentWFTStartedEventId() { + return workflowTaskStartedEventId; } public long getHistorySize() { @@ -716,7 +720,7 @@ private long setCurrentTimeMillis(long currentTimeMillis) { } public long getLastStartedEventId() { - return currentStartedEventId; + return lastWFTStartedEventId; } /** @@ -1177,7 +1181,7 @@ public void workflowTaskStarted( value.nonReplayWorkflowTaskStarted(); } } - WorkflowStateMachines.this.currentStartedEventId = startedEventId; + WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId; WorkflowStateMachines.this.historySize = historySize; WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested; @@ -1295,6 +1299,6 @@ private String createEventHandlingMessage(HistoryEvent event) { private String createShortCurrentStateMessagePostfix() { return String.format( "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}", - this.workflowTaskStartedEventId, this.currentStartedEventId); + this.workflowTaskStartedEventId, this.lastWFTStartedEventId); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java index c22c8a1ae..f44132348 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java @@ -137,7 +137,7 @@ public String getCronSchedule() { @Override public long getHistoryLength() { - return context.getCurrentWorkflowTaskStartedEventId(); + return context.getLastWorkflowTaskStartedEventId(); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java b/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java index b8a6bd31c..22a27922f 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java @@ -24,6 +24,7 @@ import io.temporal.activity.Activity; import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.BuildIdOperation; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; @@ -45,7 +46,6 @@ public class BuildIdVersioningTest { SDKTestWorkflowRule.newBuilder() .setWorkerOptions( WorkerOptions.newBuilder().setBuildId("1.0").setUseBuildIdForVersioning(true).build()) - .setWorkflowTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class) .setActivityImplementations(new BuildIdVersioningTest.ActivityImpl()) .setDoNotStart(true) .build(); @@ -57,6 +57,10 @@ public void testBuildIdVersioningDataSetProperly() { String taskQueue = testWorkflowRule.getTaskQueue(); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + testWorkflowRule + .getWorker() + .registerWorkflowImplementationTypes( + BuildIdVersioningTest.TestVersioningWorkflowImpl.class); // Add 1.0 to the queue workflowClient.updateWorkerBuildIdCompatability( @@ -129,6 +133,10 @@ public void testCurrentBuildIDSetProperly() throws InterruptedException { String taskQueue = testWorkflowRule.getTaskQueue(); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + testWorkflowRule + .getWorker() + .registerWorkflowImplementationTypes( + BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class); // Add 1.0 to the queue workflowClient.updateWorkerBuildIdCompatability( @@ -152,7 +160,16 @@ public void testCurrentBuildIDSetProperly() throws InterruptedException { // Wait for activity to run ACTIVITY_RAN.waitForSignal(); Assert.assertEquals("1.0", wf1.getState()); + testWorkflowRule.getTestEnvironment().shutdown(); + workflowClient + .getWorkflowServiceStubs() + .blockingStub() + .resetStickyTaskQueue( + io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId).build()) + .build()); // Add 1.1 to the queue workflowClient.updateWorkerBuildIdCompatability( @@ -164,7 +181,7 @@ public void testCurrentBuildIDSetProperly() throws InterruptedException { w11F.newWorker( taskQueue, WorkerOptions.newBuilder().setBuildId("1.1").setUseBuildIdForVersioning(true).build()); - w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class); + w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class); w11.registerActivitiesImplementations(new BuildIdVersioningTest.ActivityImpl()); w11F.start(); @@ -219,18 +236,27 @@ public static class TestCurrentBuildIdWorkflow implements TestWorkflows.Queryabl TestActivities.TestActivity1.class, ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build()); private boolean doFinish = false; + private String lastBuildId; @WorkflowMethod public String execute() { + updateBuildId(); Workflow.sleep(1); + updateBuildId(); if (Workflow.getInfo().getCurrentBuildId().orElse("").equals("1.0")) { activity.execute("foo"); + updateBuildId(); ACTIVITY_RAN.signal(); } Workflow.await(() -> doFinish); + updateBuildId(); return "Yay done"; } + private void updateBuildId() { + lastBuildId = Workflow.getInfo().getCurrentBuildId().orElse(""); + } + @Override public void mySignal(String arg) { doFinish = true; @@ -238,7 +264,8 @@ public void mySignal(String arg) { @Override public String getState() { - return Workflow.getInfo().getCurrentBuildId().orElse(""); + // Workflow.getInfo isn't accessible in queries, so we do this + return lastBuildId; } } } diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 6de20e545..25daa91e0 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -358,7 +358,7 @@ public Map getHeader() { } @Override - public long getCurrentWorkflowTaskStartedEventId() { + public long getLastWorkflowTaskStartedEventId() { return 0; }