Skip to content
This repository has been archived by the owner on Aug 15, 2021. It is now read-only.

Commit

Permalink
Workflow description based status
Browse files Browse the repository at this point in the history
Use the workflow description api for status, as an example
  • Loading branch information
jbowes committed Jul 15, 2021
1 parent 8452d0a commit a8355a4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 50 deletions.
55 changes: 41 additions & 14 deletions workflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (

"github.com/go-zoo/bone"
"github.com/gofrs/uuid"
"github.com/gogo/status"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"google.golang.org/grpc/codes"
)

func Api(c client.Client, r *Repository) http.Handler {
func Api(c client.Client) http.Handler {
mux := bone.New()

// API to launch an workflow. could be an incoming webhook,
Expand All @@ -34,7 +37,7 @@ func Api(c client.Client, r *Repository) http.Handler {
TaskQueue: PRCheckTaskQueue,
}

work, err := c.ExecuteWorkflow(context.Background(), options, (&CheckPR{}).CheckPR, details)
work, err := c.ExecuteWorkflow(context.Background(), options, CheckPR, details)
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte("couldn't enqueue"))
Expand All @@ -50,26 +53,50 @@ func Api(c client.Client, r *Repository) http.Handler {

// Get the status of a given job
mux.Get("/jobs/:id", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()

id := bone.GetValue(req, "id")

rw.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(rw)

var s PRStatus
err := r.Get(req.Context(), id, &s)
switch err {
case nil:
rw.WriteHeader(http.StatusOK)
enc.Encode(&s)
case ErrNotFound:
// TODO: ask temporal here, in case it is PENDING.
// Does the requested workflow execution exist at all?
desc, err := c.DescribeWorkflowExecution(ctx, id, "")
switch {
case err == nil:
case status.Code(err) == codes.NotFound: // XXX: this isn't the right check
rw.WriteHeader(http.StatusNotFound)
enc.Encode("not found")
fmt.Fprint(rw, "{}")
return
default:
rw.WriteHeader(http.StatusInternalServerError)
enc.Encode("error")
fmt.Println(err)
fmt.Fprint(rw, "{}")
return
}

// You could check the execution "memo" here for
// thinks like an org id for AuthZ.

var out struct {
Status string `json:"status"`
}

// Figure out what our status is.
// As soon as the work is added to the queue, its status
// will be running (so "pending" won't work here).
switch desc.WorkflowExecutionInfo.Status {
case enums.WORKFLOW_EXECUTION_STATUS_COMPLETED:
out.Status = "completed"
case enums.WORKFLOW_EXECUTION_STATUS_RUNNING, enums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
out.Status = "running"
case enums.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
out.Status = "timed_out"
default: // cancelled, errored, etc
out.Status = "errored"
}

rw.WriteHeader(http.StatusOK)
enc := json.NewEncoder(rw)
enc.Encode(out)
}))

// API to complete an activity.
Expand Down
1 change: 1 addition & 0 deletions workflow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ require (
github.com/go-zoo/bone v1.3.0
github.com/gofrs/uuid v4.0.0+incompatible
go.temporal.io/sdk v1.8.0
google.golang.org/grpc v1.37.0
)
3 changes: 3 additions & 0 deletions workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"log"
"net/http"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
Expand All @@ -29,6 +30,8 @@ func main() {
log.Fatalln(err)
}()

time.Sleep(2 * time.Minute)

// This worker hosts both Worker and Activity functions
w := worker.New(c, PRCheckTaskQueue, worker.Options{})

Expand Down
37 changes: 1 addition & 36 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,13 @@ type CheckDetails struct {
New string
}

type CheckPR struct {
r *Repository
}

func (cpr *CheckPR) CheckPR(ctx workflow.Context, details CheckDetails) error {
func CheckPR(ctx workflow.Context, details CheckDetails) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute, // Max 1 minute before quitting
})

ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: time.Minute, // Max 1 minute before quitting
})

i := workflow.GetInfo(ctx)

// TODO: put status in error on failure.
var status PRStatus

// Start both tests. We get back futures. They transparently handle
// retries and persisting the results.
status.Status = append([]PRStatusItem{{State: "testing", TimeStamp: workflow.Now(ctx), Description: "tests are running"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

old := workflow.ExecuteActivity(ctx, Test, details.Repo, details.Old)
new := workflow.ExecuteActivity(ctx, Test, details.Repo, details.New)

Expand All @@ -63,33 +45,16 @@ func (cpr *CheckPR) CheckPR(ctx workflow.Context, details CheckDetails) error {

// fast to do and deterministic. run inside the workflow.
var diff string
status.Status = append([]PRStatusItem{{State: "diffing", TimeStamp: workflow.Now(ctx), Description: "test results are diffing"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

err := workflow.ExecuteActivity(ctx, DiffResults, oldRes, newRes).Get(ctx, &diff)
if err != nil {
return err
}

// Resolve the final task and finish.
status.Status = append([]PRStatusItem{{State: "reporting", TimeStamp: workflow.Now(ctx), Description: "PR check results are being posted"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

if err := workflow.ExecuteActivity(ctx, SetCommitStatus, details.Repo, details.PR, diff).Get(ctx, nil); err != nil {
return err
}

// XXX: Does setting this final status make sense? how will query interact with the actual workflow completion status?
// is there a race condition? probably doesn't matter.
status.Status = append([]PRStatusItem{{State: "complete", TimeStamp: workflow.Now(ctx), Description: "All done"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit a8355a4

Please sign in to comment.