-
Notifications
You must be signed in to change notification settings - Fork 1
/
replaytest.go
124 lines (117 loc) · 4.31 KB
/
replaytest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package tempts
import (
"context"
"encoding/json"
"fmt"
"github.com/gogo/protobuf/proto"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/filter/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
// GetWorkflowHistoriesBundle connects to the temporal server and fetches the most recent 10 open and 10 closed executions.
// It returns a byte seralized piece of data that can be used immediately or in the future to call ReplayWorkflow.
func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration) ([]byte, error) {
closedExecutions, err := client.Client.WorkflowService().ListClosedWorkflowExecutions(ctx, &workflowservice.ListClosedWorkflowExecutionsRequest{
Namespace: client.namespace,
MaximumPageSize: 10,
Filters: &workflowservice.ListClosedWorkflowExecutionsRequest_TypeFilter{
TypeFilter: &filter.WorkflowTypeFilter{Name: w.Name()},
},
})
if err != nil {
return nil, fmt.Errorf("failed to get closed executions: %w", err)
}
openExecutions, err := client.Client.WorkflowService().ListOpenWorkflowExecutions(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: client.namespace,
MaximumPageSize: 10,
Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{
TypeFilter: &filter.WorkflowTypeFilter{Name: w.Name()},
},
})
if err != nil {
return nil, fmt.Errorf("failed to get open executions: %w", err)
}
allExecutions := append(closedExecutions.Executions, openExecutions.Executions...)
hists := []*history.History{}
for _, e := range allExecutions {
var hist history.History
fmt.Println(e.Execution)
iter := client.Client.GetWorkflowHistory(ctx, e.Execution.WorkflowId, e.Execution.RunId, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
if err != nil {
return nil, fmt.Errorf("failed to get history: %w", err)
}
hist.Events = append(hist.Events, event)
}
hists = append(hists, &hist)
}
historiesData := historiesData{
WorkflowName: w.Name(),
}
for i, h := range hists {
if len(h.Events) < 3 {
// The relay code requires history to have at least 3 events, so 2 even histories are considered invalid.
continue
}
hBytes, err := proto.Marshal(h)
if err != nil {
return nil, fmt.Errorf("failed to marshal history: %s", err)
}
historiesData.Histories = append(historiesData.Histories, historyWithMetadata{
WorkflowID: allExecutions[i].Execution.WorkflowId,
RunID: allExecutions[i].Execution.RunId,
HistoryBytes: hBytes,
})
}
historiesBytes, err := json.Marshal(historiesData)
if err != nil {
return nil, fmt.Errorf("failed to marshal histories: %w", err)
}
return historiesBytes, nil
}
type historyWithMetadata struct {
WorkflowID string
RunID string
HistoryBytes []byte
}
type historiesData struct {
Histories []historyWithMetadata
WorkflowName string
}
// ReplayWorkflow is meant to be used in tests with the output of GetWorkflowHistoriesBundle to check if the given workflow implementation (fn) is compatible with previous executions captured at the time when GetWorkflowHistoriesBundle was run.
func ReplayWorkflow(historiesBytes []byte, fn any, opts worker.WorkflowReplayerOptions) error {
var historiesData historiesData
err := json.Unmarshal(historiesBytes, &historiesData)
if err != nil {
return fmt.Errorf("failed to unmarshal histories: %w", err)
}
if len(historiesData.Histories) == 0 {
return fmt.Errorf("no histories available")
}
replayer, err := worker.NewWorkflowReplayerWithOptions(opts)
if err != nil {
return fmt.Errorf("failed to create replayer: %w", err)
}
replayer.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{
Name: historiesData.WorkflowName,
})
for _, histData := range historiesData.Histories {
var h history.History
err := proto.Unmarshal(histData.HistoryBytes, &h)
if err != nil {
return fmt.Errorf("failed to unmarshal history: %w", err)
}
opts := worker.ReplayWorkflowHistoryOptions{}
opts.OriginalExecution.ID = histData.WorkflowID
opts.OriginalExecution.RunID = histData.RunID
err = replayer.ReplayWorkflowHistoryWithOptions(nil, &h, opts)
if err != nil {
return fmt.Errorf("failed to replay workflow %s: %w", historiesData.WorkflowName, err)
}
}
return nil
}