diff --git a/go.mod b/go.mod index 7d43dca5ced..7e8ebfd0315 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 - go.temporal.io/api v1.43.0 + go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5 go.temporal.io/sdk v1.30.1 go.temporal.io/version v0.3.0 go.uber.org/automaxprocs v1.5.3 diff --git a/go.sum b/go.sum index ac1a9c11b20..9765738a869 100644 --- a/go.sum +++ b/go.sum @@ -330,8 +330,8 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= -go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg= -go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= +go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5 h1:/gZwoMXVPu9HouFLdNdHDgIDxtVfvuY3rqApj2ffZ20= +go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/sdk v1.30.1 h1:4wgfSjwuaayQl9Q0mUzpNV6w55TPAESSroR6Z5lE49o= go.temporal.io/sdk v1.30.1/go.mod h1:hNCZzd6dt7bxD9B4AECQgjHTd2NrzjdmGDbbv4xHuFU= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index 05f12cbbed2..60f2ba61198 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -216,7 +216,7 @@ func Invoke( }, }, nil case enumspb.QUERY_RESULT_TYPE_FAILED: - return nil, serviceerror.NewQueryFailed(result.GetErrorMessage()) + return nil, serviceerror.NewQueryFailedWithFailure(result.GetErrorMessage(), result.GetFailure()) default: metrics.QueryRegistryInvalidStateCount.With(scope).Record(1) return nil, consts.ErrQueryEnteredInvalidState diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index affa38585e8..0c87bee300c 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -960,7 +960,7 @@ func (e *matchingEngineImpl) QueryWorkflow( case enumspb.QUERY_RESULT_TYPE_ANSWERED: return &matchingservice.QueryWorkflowResponse{QueryResult: workerResponse.GetCompletedRequest().GetQueryResult()}, nil case enumspb.QUERY_RESULT_TYPE_FAILED: - return nil, serviceerror.NewQueryFailed(workerResponse.GetCompletedRequest().GetErrorMessage()) + return nil, serviceerror.NewQueryFailedWithFailure(workerResponse.GetCompletedRequest().GetErrorMessage(), workerResponse.GetCompletedRequest().GetFailure()) default: return nil, serviceerror.NewInternal("unknown query completed type") } diff --git a/tests/query_workflow_test.go b/tests/query_workflow_test.go index f2eaaf3303f..09eec1a2233 100644 --- a/tests/query_workflow_test.go +++ b/tests/query_workflow_test.go @@ -28,6 +28,8 @@ import ( "context" "errors" "fmt" + "maps" + "slices" "sync" "sync/atomic" "testing" @@ -35,12 +37,21 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + commandpb "go.temporal.io/api/command/v1" + "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/failure/v1" + querypb "go.temporal.io/api/query/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/testing/testvars" + "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/tests/testcore" ) @@ -357,3 +368,121 @@ func (s *QueryWorkflowSuite) TestQueryWorkflow_ClosedWithoutWorkflowTaskStarted( s.Error(err) s.ErrorContains(err, consts.ErrWorkflowClosedBeforeWorkflowTaskStarted.Error()) } + +func (s *QueryWorkflowSuite) TestQueryWorkflow_FailurePropagated() { + ctx := testcore.NewContext() + taskQueue := testcore.RandomizeStr(s.T().Name()) + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{TaskQueue: taskQueue}, "workflow") + s.NoError(err) + + // Create a channel for errors generated in background goroutines. + errChan := make(chan error, 1) + + // Query the workflow in the background to have the query delivered with the first workflow task in the Queries map. + go func() { + _, err = s.FrontendClient().QueryWorkflow(ctx, &workflowservice.QueryWorkflowRequest{ + Namespace: s.Namespace(), + Execution: &common.WorkflowExecution{ + WorkflowId: workflowRun.GetID(), + }, + Query: &querypb.WorkflowQuery{ + QueryType: "dont-care", + }, + }) + errChan <- err + }() + + // Hope that 3 seconds will be enough for history to record the query and attach it to the pending workflow task. + // There's really no other way to ensure that the query is included in the task unfortunately. + util.InterruptibleSleep(ctx, 3*time.Second) + + task, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Identity: s.T().Name(), + }) + + s.Len(task.Queries, 1) + qKey := slices.Collect(maps.Keys(task.Queries))[0] + + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + TaskToken: task.TaskToken, + Identity: s.T().Name(), + QueryResults: map[string]*querypb.WorkflowQueryResult{ + qKey: { + ResultType: enumspb.QUERY_RESULT_TYPE_FAILED, + ErrorMessage: "my error message", + Failure: &failure.Failure{ + Message: "my failure error message", + }, + }, + }, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{}, + }, + }, + }) + s.NoError(err) + + select { + case err = <-errChan: + case <-ctx.Done(): + // Abort and fail the test. + s.NoError(ctx.Err()) + } + + var query1FailedErr *serviceerror.QueryFailed + s.ErrorAs(err, &query1FailedErr) + s.Equal("my error message", query1FailedErr.Message) + s.Equal("my failure error message", query1FailedErr.Failure.Message) + + go func() { + task, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Identity: s.T().Name(), + }) + if err != nil { + errChan <- err + return + } + + _, err = s.FrontendClient().RespondQueryTaskCompleted(ctx, &workflowservice.RespondQueryTaskCompletedRequest{ + Namespace: s.Namespace(), + TaskToken: task.TaskToken, + CompletedType: enumspb.QUERY_RESULT_TYPE_FAILED, + ErrorMessage: "my error message", + Failure: &failure.Failure{ + Message: "my failure error message", + }, + }) + + errChan <- err + }() + + _, err = s.FrontendClient().QueryWorkflow(ctx, &workflowservice.QueryWorkflowRequest{ + Namespace: s.Namespace(), + Execution: &common.WorkflowExecution{ + WorkflowId: workflowRun.GetID(), + }, + Query: &querypb.WorkflowQuery{ + QueryType: "dont-care", + }, + }) + + var query2FailedErr *serviceerror.QueryFailed + s.ErrorAs(err, &query2FailedErr) + s.Equal("my error message", query2FailedErr.Message) + s.Equal("my failure error message", query2FailedErr.Failure.Message) + + select { + case err = <-errChan: + s.NoError(err) + case <-ctx.Done(): + // Abort and fail the test. + s.NoError(ctx.Err()) + } +}