Skip to content

Commit

Permalink
Enable speculative workflow task with commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Nov 24, 2024
1 parent 99a35d4 commit 2982081
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 2 deletions.
2 changes: 1 addition & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (b *builder) integrationTest() error {
}

// Run integration test
args := []string{"go", "test", "-tags", "protolegacy", "-count", "1", "-race", "-v", "-timeout", "10m"}
args := []string{"go", "test", "-tags", "protolegacy", "-count", "1", "-v", "-timeout", "10m"}
if *runFlag != "" {
args = append(args, "-run", *runFlag)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,9 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
BuildId: wth.workerBuildID,
UseVersioning: wth.useBuildIDForVersioning,
},
Capabilities: &workflowservice.RespondWorkflowTaskCompletedRequest_Capabilities{
DiscardSpeculativeWorkflowTaskWithEvents: true,
},
}
if wth.capabilities != nil && wth.capabilities.BuildIdBasedVersioning {
builtRequest.BinaryChecksum = ""
Expand Down
2 changes: 1 addition & 1 deletion test/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module go.temporal.io/sdk/test

go 1.21
go 1.22

toolchain go1.21.1

Expand Down
55 changes: 55 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3900,6 +3900,61 @@ func (ts *IntegrationTestSuite) TestUpdateRejectedDuplicated() {
ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID())
}

func (ts *IntegrationTestSuite) TestSpeculativeUpdate() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
options := ts.startWorkflowOptions("test-speculative-update")
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowWithUpdate)
ts.NoError(err)
// Send a regular update
handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []interface{}{1},
})
ts.NoError(err)
ts.NoError(handle.Get(ctx, nil))

for i := 0; i < 5; i++ {
handle, err = ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []interface{}{0},
})
ts.NoError(err)
ts.Error(handle.Get(ctx, nil))
}

handle, err = ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []interface{}{12},
})
ts.NoError(err)
ts.NoError(handle.Get(ctx, nil))

for i := 0; i < 5; i++ {
handle, err = ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []interface{}{0},
})
ts.NoError(err)
ts.Error(handle.Get(ctx, nil))
}

ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil)
ts.NoError(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInGoroutine() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
36 changes: 36 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"errors"
"fmt"
"log"
mathrand "math/rand/v2"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -2701,6 +2702,40 @@ func (w *Workflows) WorkflowWithRejectableUpdate(ctx workflow.Context) error {
return nil
}

func (w *Workflows) WorkflowWithUpdate(ctx workflow.Context) error {
workflow.SetUpdateHandlerWithOptions(ctx, "update",
func(ctx workflow.Context, count int) error {
for i := 0; i < count; i++ {
var i int
err := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return mathrand.IntN(4)
}).Get(&i)
if err != nil {
return err
}
if i == 0 {
workflow.NewTimer(ctx, time.Hour)
} else if i == 1 {
workflow.GetVersion(ctx, "change-id", workflow.DefaultVersion, 1)
} else if i == 2 {
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
var a *Activities
workflow.ExecuteActivity(ctx, a.WaitForWorkerStop, time.Hour)
}
}
return nil
}, workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, count int) error {
if count <= 0 {
return errors.New("test update rejected")
}
return nil
},
})
workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil)
return nil
}

func (w *Workflows) UpdateOrdering(ctx workflow.Context) (int, error) {
updatesRan := 0
updateHandle := func(ctx workflow.Context) error {
Expand Down Expand Up @@ -3358,6 +3393,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.UserMetadata)
worker.RegisterWorkflow(w.AwaitWithOptions)
worker.RegisterWorkflow(w.WorkflowWithRejectableUpdate)
worker.RegisterWorkflow(w.WorkflowWithUpdate)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childWithRetryPolicy)
Expand Down

0 comments on commit 2982081

Please sign in to comment.