Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Failure #6947

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.43.0
go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5
go.temporal.io/sdk v1.30.1
go.temporal.io/version v0.3.0
go.uber.org/automaxprocs v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg=
go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5 h1:/gZwoMXVPu9HouFLdNdHDgIDxtVfvuY3rqApj2ffZ20=
go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.30.1 h1:4wgfSjwuaayQl9Q0mUzpNV6w55TPAESSroR6Z5lE49o=
go.temporal.io/sdk v1.30.1/go.mod h1:hNCZzd6dt7bxD9B4AECQgjHTd2NrzjdmGDbbv4xHuFU=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func Invoke(
},
}, nil
case enumspb.QUERY_RESULT_TYPE_FAILED:
return nil, serviceerror.NewQueryFailed(result.GetErrorMessage())
return nil, serviceerror.NewQueryFailedWithFailure(result.GetErrorMessage(), result.GetFailure())
default:
metrics.QueryRegistryInvalidStateCount.With(scope).Record(1)
return nil, consts.ErrQueryEnteredInvalidState
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (e *matchingEngineImpl) QueryWorkflow(
case enumspb.QUERY_RESULT_TYPE_ANSWERED:
return &matchingservice.QueryWorkflowResponse{QueryResult: workerResponse.GetCompletedRequest().GetQueryResult()}, nil
case enumspb.QUERY_RESULT_TYPE_FAILED:
return nil, serviceerror.NewQueryFailed(workerResponse.GetCompletedRequest().GetErrorMessage())
return nil, serviceerror.NewQueryFailedWithFailure(workerResponse.GetCompletedRequest().GetErrorMessage(), workerResponse.GetCompletedRequest().GetFailure())
default:
return nil, serviceerror.NewInternal("unknown query completed type")
}
Expand Down
129 changes: 129 additions & 0 deletions tests/query_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,30 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
commandpb "go.temporal.io/api/command/v1"
"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/failure/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/testing/testvars"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/tests/testcore"
)
Expand Down Expand Up @@ -357,3 +368,121 @@ func (s *QueryWorkflowSuite) TestQueryWorkflow_ClosedWithoutWorkflowTaskStarted(
s.Error(err)
s.ErrorContains(err, consts.ErrWorkflowClosedBeforeWorkflowTaskStarted.Error())
}

func (s *QueryWorkflowSuite) TestQueryWorkflow_FailurePropagated() {
ctx := testcore.NewContext()
taskQueue := testcore.RandomizeStr(s.T().Name())

workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{TaskQueue: taskQueue}, "workflow")
s.NoError(err)

// Create a channel for errors generated in background goroutines.
errChan := make(chan error, 1)

// Query the workflow in the background to have the query delivered with the first workflow task in the Queries map.
go func() {
_, err = s.FrontendClient().QueryWorkflow(ctx, &workflowservice.QueryWorkflowRequest{
Namespace: s.Namespace(),
Execution: &common.WorkflowExecution{
WorkflowId: workflowRun.GetID(),
},
Query: &querypb.WorkflowQuery{
QueryType: "dont-care",
},
})
errChan <- err
}()

// Hope that 3 seconds will be enough for history to record the query and attach it to the pending workflow task.
// There's really no other way to ensure that the query is included in the task unfortunately.
util.InterruptibleSleep(ctx, 3*time.Second)

task, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: s.Namespace(),
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Identity: s.T().Name(),
})

s.Len(task.Queries, 1)
qKey := slices.Collect(maps.Keys(task.Queries))[0]

_, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: task.TaskToken,
Identity: s.T().Name(),
QueryResults: map[string]*querypb.WorkflowQueryResult{
qKey: {
ResultType: enumspb.QUERY_RESULT_TYPE_FAILED,
ErrorMessage: "my error message",
Failure: &failure.Failure{
Message: "my failure error message",
},
},
},
Commands: []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{},
},
},
})
s.NoError(err)

select {
case err = <-errChan:
case <-ctx.Done():
// Abort and fail the test.
s.NoError(ctx.Err())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a small comment explaining the purpose of the second half of the test? I don't quite see the difference to the first half.

var query1FailedErr *serviceerror.QueryFailed
s.ErrorAs(err, &query1FailedErr)
s.Equal("my error message", query1FailedErr.Message)
s.Equal("my failure error message", query1FailedErr.Failure.Message)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why not just a single test, with both "QueryWorkflow" and "PollWorkflowTaskQueue/RespondQueryTaskCompleted" in a separate goroutines? What are you testing by testing like that?

task, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: s.Namespace(),
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Identity: s.T().Name(),
})
if err != nil {
errChan <- err
return
}

_, err = s.FrontendClient().RespondQueryTaskCompleted(ctx, &workflowservice.RespondQueryTaskCompletedRequest{
Namespace: s.Namespace(),
TaskToken: task.TaskToken,
CompletedType: enumspb.QUERY_RESULT_TYPE_FAILED,
ErrorMessage: "my error message",
Failure: &failure.Failure{
Message: "my failure error message",
},
})

errChan <- err
}()

_, err = s.FrontendClient().QueryWorkflow(ctx, &workflowservice.QueryWorkflowRequest{
Namespace: s.Namespace(),
Execution: &common.WorkflowExecution{
WorkflowId: workflowRun.GetID(),
},
Query: &querypb.WorkflowQuery{
QueryType: "dont-care",
},
})

var query2FailedErr *serviceerror.QueryFailed
s.ErrorAs(err, &query2FailedErr)
s.Equal("my error message", query2FailedErr.Message)
s.Equal("my failure error message", query2FailedErr.Failure.Message)

select {
case err = <-errChan:
s.NoError(err)
case <-ctx.Done():
// Abort and fail the test.
s.NoError(ctx.Err())
}
}
Loading