Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

Commit

Permalink
fix(workflow): workflow approval
Browse files Browse the repository at this point in the history
Fix approval step reject

#1373, #1375, #1389
  • Loading branch information
alexcodelf authored and aiwantaozi committed Nov 10, 2023
1 parent f8e677e commit 4274a88
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 77 deletions.
41 changes: 40 additions & 1 deletion pkg/apis/workflowstepexecution/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/seal-io/walrus/pkg/dao/model"
"github.com/seal-io/walrus/pkg/dao/model/subject"
"github.com/seal-io/walrus/pkg/dao/model/workflowstepexecution"
"github.com/seal-io/walrus/pkg/dao/types"
"github.com/seal-io/walrus/pkg/dao/types/object"
Expand All @@ -13,6 +14,7 @@ import (
pkgworkflow "github.com/seal-io/walrus/pkg/workflow"
"github.com/seal-io/walrus/utils/gopool"
"github.com/seal-io/walrus/utils/log"
"github.com/seal-io/walrus/utils/strs"
"github.com/seal-io/walrus/utils/topic"
)

Expand Down Expand Up @@ -41,7 +43,15 @@ func (h Handler) Update(req UpdateRequest) error {
case types.ExecutionStatusSucceeded:
status.WorkflowStepExecutionStatusRunning.True(entity, "")
case types.ExecutionStatusFailed, types.ExecutionStatusError:
status.WorkflowStepExecutionStatusRunning.False(entity, "")
message := ""
if entity.Type == types.WorkflowStepTypeApproval {
message, err = h.getRejectMessage(req.Context, entity)
if err != nil {
return err
}
}

status.WorkflowStepExecutionStatusRunning.False(entity, message)
case types.ExecutionStatusRunning:
status.WorkflowExecutionStatusPending.True(entity, "")
status.WorkflowStepExecutionStatusRunning.Unknown(entity, "")
Expand Down Expand Up @@ -92,3 +102,32 @@ func (h Handler) Update(req UpdateRequest) error {

return nil
}

func (h Handler) getRejectMessage(ctx context.Context, entity *model.WorkflowStepExecution) (string, error) {
message := ""

approvalSpec, err := types.NewWorkflowStepApprovalSpec(entity.Attributes)
if err != nil {
return "", err
}

if approvalSpec.IsRejected() {
rejectedUsers := approvalSpec.RejectedUsers
rejectedUserNames := make([]string, len(rejectedUsers))

subjects, err := h.modelClient.Subjects().Query().
Where(subject.IDIn(rejectedUsers...)).
All(ctx)
if err != nil {
return "", err
}

for i := range subjects {
rejectedUserNames[i] = subjects[i].Name
}

message = "rejected by " + strs.Join[string](",", rejectedUserNames...)
}

return message, nil
}
19 changes: 16 additions & 3 deletions pkg/apis/workflowstepexecution/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"io"

"github.com/seal-io/walrus/pkg/dao/model"
"github.com/seal-io/walrus/pkg/dao/model/workflowexecution"
"github.com/seal-io/walrus/pkg/dao/model/workflowstepexecution"
"github.com/seal-io/walrus/pkg/workflow"
)
Expand Down Expand Up @@ -47,12 +49,23 @@ func (h Handler) RouteApprove(req RouteApproveRequest) error {
return err
}

client, err := workflow.NewArgoWorkflowClient(h.modelClient, h.k8sConfig)
workflowExecution, err := h.modelClient.WorkflowExecutions().Query().
Where(workflowexecution.ID(stepExecution.WorkflowExecutionID)).
Only(req.Context)
if err != nil {
return err
}

return client.Resume(req.Context, workflow.ResumeOptions{
WorkflowStepExecution: stepExecution,
return h.modelClient.WithTx(req.Context, func(tx *model.Tx) error {
client, err := workflow.NewArgoWorkflowClient(h.modelClient, h.k8sConfig)
if err != nil {
return err
}

return client.Resume(req.Context, workflow.ResumeOptions{
Approve: req.Approve,
WorkflowExecution: workflowExecution,
WorkflowStepExecution: stepExecution,
})
})
}
2 changes: 2 additions & 0 deletions pkg/apis/workflowstepexecution/extension_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type RouteApproveRequest struct {
_ struct{} `route:"POST=/approve"`

model.WorkflowStepExecutionQueryInput `path:",inline"`

Approve bool `json:"approve"`
}

func (r *RouteApproveRequest) Validate() error {
Expand Down
115 changes: 84 additions & 31 deletions pkg/dao/types/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ const (
WorkflowStepApprovalUsers = "approvalUsers"
// WorkflowStepApprovedUsers is the key of approved users in spec.
WorkflowStepApprovedUsers = "approvedUsers"
// WorkflowStepRejectedUsers is the key of reject users in spec.
WorkflowStepRejectedUsers = "rejectedUsers"
)

type WorkflowStepApprovalSpec struct {
ApprovalUsers []object.ID `json:"approvalUsers"`
ApprovedUsers []object.ID `json:"approvedUsers"`
RejectedUsers []object.ID `json:"rejectedUsers"`
Type string `json:"approvalType"`
}

Expand All @@ -83,26 +86,46 @@ func NewWorkflowStepApprovalSpec(spec map[string]any) (*WorkflowStepApprovalSpec
return nil, errors.New("invalid input: invalid approval type")
}

if v, ok := spec[WorkflowStepApprovalUsers].([]any); ok {
users, err := toObjectIDs(v)
if err != nil {
return nil, err
}
s.ApprovalUsers = removeDuplicatedUsers(users)
userIndexes := []string{
WorkflowStepApprovalUsers,
WorkflowStepApprovedUsers,
WorkflowStepRejectedUsers,
}

if v, ok := spec[WorkflowStepApprovedUsers].([]any); ok {
users, err := toObjectIDs(v)
if err != nil {
return nil, err
for i := range userIndexes {
if _, ok := spec[userIndexes[i]]; !ok {
continue
}

if v, ok := spec[userIndexes[i]].([]any); ok {
users, err := toObjectIDs(v)
if err != nil {
return nil, err
}

switch userIndexes[i] {
case WorkflowStepApprovalUsers:
s.ApprovalUsers = users
case WorkflowStepApprovedUsers:
s.ApprovedUsers = users
case WorkflowStepRejectedUsers:
s.RejectedUsers = users
}
}
s.ApprovedUsers = removeDuplicatedUsers(users)
}

return s, nil
}

func (s *WorkflowStepApprovalSpec) IsRejected() bool {
return len(s.RejectedUsers) > 0
}

func (s *WorkflowStepApprovalSpec) IsApproved() bool {
if s.IsRejected() {
return false
}

if s.Type == WorkflowStepApprovalTypeOr {
return len(s.ApprovedUsers) > 0
}
Expand All @@ -115,7 +138,7 @@ func (s *WorkflowStepApprovalSpec) IsApproved() bool {
return false
}

func (s *WorkflowStepApprovalSpec) SetApprovedUser(user object.ID) error {
func (s *WorkflowStepApprovalSpec) IsApprovalUser(user object.ID) bool {
isApprovalUser := false

for i := range s.ApprovalUsers {
Expand All @@ -126,14 +149,42 @@ func (s *WorkflowStepApprovalSpec) SetApprovedUser(user object.ID) error {
isApprovalUser = true
}

if !isApprovalUser {
return isApprovalUser
}

// SetUserApproval sets the user approval status.
func (s *WorkflowStepApprovalSpec) SetUserApproval(user object.ID, approved bool) error {
if approved {
return s.SetApprovedUser(user)
}

return s.SetRejectedUser(user)
}

func (s *WorkflowStepApprovalSpec) SetApprovedUser(user object.ID) error {
if !s.IsApprovalUser(user) {
return errors.New("user is not an approval users")
}

if isExist(user, s.ApprovedUsers) {
return nil
}

s.ApprovedUsers = append(s.ApprovedUsers, user)

// Remove duplicated users.
s.ApprovedUsers = removeDuplicatedUsers(s.ApprovedUsers)
return nil
}

func (s *WorkflowStepApprovalSpec) SetRejectedUser(user object.ID) error {
if !s.IsApprovalUser(user) {
return errors.New("user is not an approval users")
}

if isExist(user, s.ApprovedUsers) {
return nil
}

s.RejectedUsers = append(s.RejectedUsers, user)

return nil
}
Expand All @@ -143,27 +194,13 @@ func (s *WorkflowStepApprovalSpec) ToAttributes() map[string]any {
WorkflowStepApprovalType: s.Type,
WorkflowStepApprovalUsers: s.ApprovalUsers,
WorkflowStepApprovedUsers: s.ApprovedUsers,
WorkflowStepRejectedUsers: s.RejectedUsers,
}
}

func removeDuplicatedUsers(users []object.ID) []object.ID {
m := make(map[object.ID]struct{}, len(users))

for i := range users {
m[users[i]] = struct{}{}
}

newUsers := make([]object.ID, 0, len(m))

for k := range m {
newUsers = append(newUsers, k)
}

return newUsers
}

func toObjectIDs(users []any) ([]object.ID, error) {
ids := make([]object.ID, 0, len(users))
um := make(map[object.ID]struct{}, len(users))

for i := range users {
if v, ok := users[i].(string); ok {
Expand All @@ -172,9 +209,25 @@ func toObjectIDs(users []any) ([]object.ID, error) {
return nil, fmt.Errorf("invalid user id: %s", v)
}

if _, ok := um[id]; ok {
continue
}

ids = append(ids, id)
um[id] = struct{}{}
}
}

return ids, nil
}

// isExist checks if user is in users.
func isExist(user object.ID, users []object.ID) bool {
for i := range users {
if users[i] == user {
return true
}
}

return false
}
55 changes: 30 additions & 25 deletions pkg/workflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/seal-io/walrus/pkg/auths"
"github.com/seal-io/walrus/pkg/auths/session"
"github.com/seal-io/walrus/pkg/dao/model"
"github.com/seal-io/walrus/pkg/dao/model/workflowexecution"
"github.com/seal-io/walrus/pkg/dao/types"
"github.com/seal-io/walrus/pkg/dao/types/object"
"github.com/seal-io/walrus/pkg/k8s"
Expand Down Expand Up @@ -55,9 +54,12 @@ type (
// SubmitOptions is the options for submitting a workflow.
// WorkflowExecution's Edge WorkflowStageExecutions and their Edge WorkflowStepExecutions must be set.
ResumeOptions struct {
// Only used when Type is "workflow".
// Approve or deny of the workflow approval step execution.
Approve bool

// WorkflowExecution is the workflow execution to be resumed.
WorkflowExecution *model.WorkflowExecution
// Only used when Type is "step".
// WorkflowStepExecution is the workflow step execution to be resumed.
WorkflowStepExecution *model.WorkflowStepExecution
}

Expand Down Expand Up @@ -161,7 +163,8 @@ func (s *ArgoWorkflowClient) Resume(ctx context.Context, opts ResumeOptions) err
return err
}

if err = approvalSpec.SetApprovedUser(subject.ID); err != nil {
err = approvalSpec.SetUserApproval(subject.ID, opts.Approve)
if err != nil {
return err
}

Expand All @@ -173,31 +176,33 @@ func (s *ArgoWorkflowClient) Resume(ctx context.Context, opts ResumeOptions) err
return err
}

// If not approved, do nothing.
if !approvalSpec.IsApproved() {
return nil
}
if approvalSpec.IsRejected() {
// Stop workflow step execution if rejected.
_, err = s.apiClient.NewWorkflowServiceClient().StopWorkflow(s.apiClient.Ctx, &workflow.WorkflowStopRequest{
Name: getWorkflowName(opts.WorkflowExecution),
Namespace: types.WalrusSystemNamespace,
NodeFieldSelector: fmt.Sprintf("templateName=suspend-%s", opts.WorkflowStepExecution.ID.String()),
})
} else {
// If not approved, do nothing.
if !approvalSpec.IsApproved() {
return nil
}

workflowExecution, err := s.mc.WorkflowExecutions().Query().
Where(workflowexecution.ID(opts.WorkflowStepExecution.WorkflowExecutionID)).
Only(ctx)
if err != nil {
return err
}
// Update secret token.
err = s.updateWorkflowExecutionToken(ctx, opts.WorkflowExecution)
if err != nil {
return err
}

// Update secret token.
err = s.updateWorkflowExecutionToken(ctx, workflowExecution)
if err != nil {
return err
// Resume workflow step execution.
_, err = s.apiClient.NewWorkflowServiceClient().ResumeWorkflow(s.apiClient.Ctx, &workflow.WorkflowResumeRequest{
Name: getWorkflowName(opts.WorkflowExecution),
Namespace: types.WalrusSystemNamespace,
NodeFieldSelector: fmt.Sprintf("templateName=suspend-%s", opts.WorkflowStepExecution.ID.String()),
})
}

// Resume workflow step execution.
_, err = s.apiClient.NewWorkflowServiceClient().ResumeWorkflow(s.apiClient.Ctx, &workflow.WorkflowResumeRequest{
Name: getWorkflowName(workflowExecution),
Namespace: types.WalrusSystemNamespace,
NodeFieldSelector: fmt.Sprintf("templateName=suspend-%s", opts.WorkflowStepExecution.ID.String()),
})

return err
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/workflow/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func ArchiveWorkflowStepExecutionLogs(ctx context.Context, opts StepExecutionLog
return err
}

if len(logs) == 0 {
return nil
}

return opts.ModelClient.WorkflowStepExecutions().UpdateOne(opts.StepExecution).
SetRecord(string(logs)).
Exec(ctx)
Expand Down
Loading

0 comments on commit 4274a88

Please sign in to comment.