Skip to content

Commit

Permalink
Merge pull request #341 from Zibbp/temporal-paginate
Browse files Browse the repository at this point in the history
feat(workflows): paginate executions
  • Loading branch information
Zibbp authored Jan 4, 2024
2 parents 2a9ebe8 + 4542179 commit b1679ca
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 9 deletions.
55 changes: 48 additions & 7 deletions internal/temporal/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package temporal

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"

Expand All @@ -27,25 +28,65 @@ type WorkflowVideoIdResult struct {
ExternalVideoId string `json:"external_video_id"`
}

func GetActiveWorkflows(ctx context.Context) ([]*workflow.WorkflowExecutionInfo, error) {
w, err := temporalClient.Client.ListOpenWorkflow(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{})
type WorkflowExecutionResponse struct {
Executions []*workflow.WorkflowExecutionInfo `json:"executions"`
NextPageToken string `json:"next_page_token"`
}

func GetActiveWorkflows(ctx context.Context, inputPageToken []byte) (*WorkflowExecutionResponse, error) {
listRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{
MaximumPageSize: 30,
}

if inputPageToken != nil {
listRequest.NextPageToken = inputPageToken
}

w, err := temporalClient.Client.ListOpenWorkflow(ctx, listRequest)
if err != nil {
log.Error().Err(err).Msg("failed to list open workflows")
log.Error().Err(err).Msg("failed to list closed workflows")
return nil, nil
}

return w.Executions, nil
var nextPageToken string
if w.NextPageToken != nil {
token := string(w.NextPageToken)
// base64 encode
nextPageToken = base64.StdEncoding.EncodeToString([]byte(token))
}

return &WorkflowExecutionResponse{
Executions: w.Executions,
NextPageToken: nextPageToken,
}, nil
}

func GetClosedWorkflows(ctx context.Context) ([]*workflow.WorkflowExecutionInfo, error) {
w, err := temporalClient.Client.ListClosedWorkflow(ctx, &workflowservice.ListClosedWorkflowExecutionsRequest{})
func GetClosedWorkflows(ctx context.Context, inputPageToken []byte) (*WorkflowExecutionResponse, error) {
listRequest := &workflowservice.ListClosedWorkflowExecutionsRequest{
MaximumPageSize: 30,
}

if inputPageToken != nil {
listRequest.NextPageToken = inputPageToken
}

w, err := temporalClient.Client.ListClosedWorkflow(ctx, listRequest)
if err != nil {
log.Error().Err(err).Msg("failed to list closed workflows")
return nil, nil
}

return w.Executions, nil
var nextPageToken string
if w.NextPageToken != nil {
token := string(w.NextPageToken)
// base64 encode
nextPageToken = base64.StdEncoding.EncodeToString([]byte(token))
}

return &WorkflowExecutionResponse{
Executions: w.Executions,
NextPageToken: nextPageToken,
}, nil
}

func GetWorkflowById(ctx context.Context, workflowId string, runId string) (*workflow.WorkflowExecutionInfo, error) {
Expand Down
21 changes: 19 additions & 2 deletions internal/transport/http/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"encoding/base64"
"net/http"

"github.com/google/uuid"
Expand All @@ -18,7 +19,15 @@ type RestartArchiveWorkflowRequest struct {
}

func (h *Handler) GetActiveWorkflows(c echo.Context) error {
executions, err := temporal.GetActiveWorkflows(c.Request().Context())
nextPageToken := c.QueryParam("next_page_token")

// base64 decode the next page token
decoded, err := base64.StdEncoding.DecodeString(nextPageToken)
if err != nil {
return err
}

executions, err := temporal.GetActiveWorkflows(c.Request().Context(), []byte(decoded))
if err != nil {
return err
}
Expand All @@ -28,7 +37,15 @@ func (h *Handler) GetActiveWorkflows(c echo.Context) error {
}

func (h *Handler) GetClosedWorkflows(c echo.Context) error {
executions, err := temporal.GetClosedWorkflows(c.Request().Context())
nextPageToken := c.QueryParam("next_page_token")

// base64 decode the next page token
decoded, err := base64.StdEncoding.DecodeString(nextPageToken)
if err != nil {
return err
}

executions, err := temporal.GetClosedWorkflows(c.Request().Context(), []byte(decoded))
if err != nil {
return err
}
Expand Down

0 comments on commit b1679ca

Please sign in to comment.