forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
taskstatus.go
386 lines (355 loc) · 11.5 KB
/
taskstatus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
package main
import (
"fmt"
"log"
"strconv"
"sync"
"time"
tcclient "github.com/taskcluster/taskcluster/v42/clients/client-go"
"github.com/taskcluster/taskcluster/v42/clients/client-go/tcqueue"
)
// Enumerate task status to aid life-cycle decision making
// Use strings for benefit of simple logging/reporting
const (
unclaimed TaskStatus = "Unclaimed"
claimed TaskStatus = "Claimed"
reclaimed TaskStatus = "Reclaimed"
aborted TaskStatus = "Aborted"
cancelled TaskStatus = "Cancelled"
succeeded TaskStatus = "Succeeded"
failed TaskStatus = "Failed"
errored TaskStatus = "Errored"
unknown TaskStatus = "Unknown"
deadlineExceeded TaskStatus = "Deadline Exceeded"
)
const (
workerShutdown TaskUpdateReason = "worker-shutdown"
malformedPayload TaskUpdateReason = "malformed-payload"
resourceUnavailable TaskUpdateReason = "resource-unavailable"
internalError TaskUpdateReason = "internal-error"
intermittentTask TaskUpdateReason = "intermittent-task"
)
type TaskStatusChangeListener struct {
Name string
Callback func(ts TaskStatus)
}
type TaskStatusManager struct {
sync.Mutex
task *TaskRun
takenUntil tcclient.Time
status tcqueue.TaskStatusStructure
// callback functions to call when status changes
statusChangeListeners map[*TaskStatusChangeListener]bool
abortException *CommandExecutionError
// closed when reclaim go routine should stop reclaiming
stopReclaiming chan<- struct{}
// closed when reclaim loop exits
reclaimingDone <-chan struct{}
// true if reclaims are no longer taking place for this task
finishedReclaiming bool
}
func (tsm *TaskStatusManager) DeregisterListener(listener *TaskStatusChangeListener) {
// https://bugzil.la/1619925
// This lock ensures that any currently executing callbacks complete before
// the listener is deregistered, and that no new callbacks are scheduled
// once this method has started.
tsm.Lock()
defer tsm.Unlock()
// if Start() failed, a listener might not be registered, so check before deleting it...
if tsm.statusChangeListeners[listener] {
delete(tsm.statusChangeListeners, listener)
}
}
func (tsm *TaskStatusManager) RegisterListener(listener *TaskStatusChangeListener) {
tsm.Lock()
defer tsm.Unlock()
tsm.statusChangeListeners[listener] = true
}
type TaskStatusUpdateError struct {
Message string
CurrentStatus TaskStatus
}
func (ue *TaskStatusUpdateError) Error() string {
return ue.Message + " (current status: " + string(ue.CurrentStatus) + ")"
}
func (tsm *TaskStatusManager) ReportException(reason TaskUpdateReason) error {
return tsm.updateStatus(
errored,
func(task *TaskRun) error {
tsm.stopReclaims()
ter := tcqueue.TaskExceptionRequest{Reason: string(reason)}
task.queueMux.RLock()
tsr, err := task.Queue.ReportException(task.TaskID, strconv.FormatInt(int64(task.RunID), 10), &ter)
task.queueMux.RUnlock()
if err != nil {
log.Printf("Not able to report exception for task %v:", task.TaskID)
log.Printf("%v", err)
return err
}
tsm.status = tsr.Status
tsm.takenUntil = tcclient.Time{}
return nil
},
claimed,
reclaimed,
aborted,
)
}
func (tsm *TaskStatusManager) ReportFailed() error {
return tsm.updateStatus(
failed,
func(task *TaskRun) error {
tsm.stopReclaims()
task.queueMux.RLock()
tsr, err := task.Queue.ReportFailed(task.TaskID, strconv.FormatInt(int64(task.RunID), 10))
task.queueMux.RUnlock()
if err != nil {
log.Printf("Not able to report failed completion for task %v:", task.TaskID)
log.Printf("%v", err)
return err
}
tsm.status = tsr.Status
tsm.takenUntil = tcclient.Time{}
return nil
},
claimed,
reclaimed,
aborted,
)
}
func (tsm *TaskStatusManager) ReportCompleted() error {
return tsm.updateStatus(
succeeded,
func(task *TaskRun) error {
tsm.stopReclaims()
log.Printf("Task %v finished successfully!", task.TaskID)
task.queueMux.RLock()
tsr, err := task.Queue.ReportCompleted(task.TaskID, strconv.FormatInt(int64(task.RunID), 10))
task.queueMux.RUnlock()
if err != nil {
log.Printf("Not able to report successful completion for task %v:", task.TaskID)
log.Printf("%v", err)
return err
}
tsm.status = tsr.Status
tsm.takenUntil = tcclient.Time{}
return nil
},
claimed,
reclaimed,
)
}
func (tsm *TaskStatusManager) reclaim() error {
return tsm.updateStatus(
reclaimed,
func(task *TaskRun) error {
log.Printf("Reclaiming task %v...", task.TaskID)
task.queueMux.RLock()
tcrsp, err := task.Queue.ReclaimTask(task.TaskID, fmt.Sprintf("%d", task.RunID))
task.queueMux.RUnlock()
// check if an error occurred...
if err != nil {
// probably task was cancelled - in any case, we should kill the running task...
log.Printf("%v", err)
task.kill()
return err
}
task.TaskReclaimResponse = *tcrsp
task.queueMux.Lock()
task.Queue = serviceFactory.Queue(
&tcclient.Credentials{
AccessToken: tcrsp.Credentials.AccessToken,
ClientID: tcrsp.Credentials.ClientID,
Certificate: tcrsp.Credentials.Certificate,
},
config.RootURL,
)
task.queueMux.Unlock()
tsm.status = tcrsp.Status
tsm.takenUntil = tcrsp.TakenUntil
if err != nil {
log.Printf("SERIOUS BUG: invalid credentials in queue claim response body: %v", err)
}
log.Printf("Reclaimed task %v successfully.", task.TaskID)
return nil
},
claimed,
reclaimed,
)
}
func (tsm *TaskStatusManager) AbortException() *CommandExecutionError {
tsm.Lock()
defer tsm.Unlock()
return tsm.abortException
}
func (tsm *TaskStatusManager) TakenUntil() tcclient.Time {
tsm.Lock()
defer tsm.Unlock()
return tsm.takenUntil
}
func (tsm *TaskStatusManager) Abort(cee *CommandExecutionError) error {
return tsm.updateStatus(
aborted,
func(task *TaskRun) error {
task.Errorf("Aborting task...")
task.kill()
tsm.abortException = cee
return nil
},
claimed,
reclaimed,
// Since task abortion may take a while, we should continue reclaiming
// until we resolve the task. If we didn't do this, but immediately
// stopped reclaiming as soon as we had a task abortion, if the task
// abortion took too long to complete, the claim might expire before
// the task finally gets resolved after the task abortion completes.
aborted,
)
}
func (tsm *TaskStatusManager) Cancel() error {
return tsm.updateStatus(
cancelled,
func(task *TaskRun) error {
//TODO: implement cancelling of tasks
return nil
},
claimed,
reclaimed,
aborted,
)
}
func (tsm *TaskStatusManager) LastKnownStatus() TaskStatus {
tsm.Lock()
defer tsm.Unlock()
return tsm.task.Status
}
// Queries the queue to get the latest status. Note, it can't recognise
// internal states claimed/reclaimed/aborted but is useful for setting
// failed/cancelled/pending/completed/exception.
func (tsm *TaskStatusManager) UpdateStatus() {
tsm.Lock()
defer tsm.Unlock()
tsm.queryQueueForLatestStatus()
}
func (tsm *TaskStatusManager) queryQueueForLatestStatus() {
log.Printf("Querying queue to get latest status for task %v...", tsm.task.TaskID)
// no scopes are required for this endpoint, so can use nil credentials
// this is also useful if tsm.task.Queue == nil (which can happen if claim
// failed because task is claimed by another worker)
queue := serviceFactory.Queue(nil, config.RootURL)
tsr, err := queue.Status(tsm.task.TaskID)
if err != nil {
tsm.task.Status = unknown
return
}
taskStatus := tsr.Status.Runs[tsm.task.RunID]
switch {
case taskStatus.ReasonResolved == "failed":
tsm.task.Status = failed
case taskStatus.ReasonResolved == "canceled":
tsm.task.Status = cancelled
case taskStatus.ReasonResolved == "deadline-exceeded":
tsm.task.Status = deadlineExceeded
case taskStatus.State == "pending":
tsm.task.Status = unclaimed
case taskStatus.State == "completed":
tsm.task.Status = succeeded
case taskStatus.State == "exception":
tsm.task.Status = errored
}
log.Printf("Latest status: %v", tsm.task.Status)
}
func (tsm *TaskStatusManager) updateStatus(ts TaskStatus, f func(task *TaskRun) error, fromStatuses ...TaskStatus) error {
tsm.Lock()
defer tsm.Unlock()
currentStatus := tsm.task.Status
for _, allowedStatus := range fromStatuses {
if currentStatus == allowedStatus {
e := f(tsm.task)
if e != nil {
tsm.queryQueueForLatestStatus()
return &TaskStatusUpdateError{
Message: e.Error(),
CurrentStatus: tsm.task.Status,
}
}
tsm.task.Status = ts
for listener := range tsm.statusChangeListeners {
log.Printf("Notifying listener %v of state change", listener.Name)
listener.Callback(ts)
}
return nil
}
}
warning := fmt.Sprintf("Not updating status of task %v run %v from %v to %v. This is because you can only update to status %v if the previous status was one of: %v", tsm.task.TaskID, tsm.task.RunID, tsm.task.Status, ts, ts, fromStatuses)
log.Print(warning)
return &TaskStatusUpdateError{
Message: warning,
CurrentStatus: tsm.task.Status,
}
}
func NewTaskStatusManager(task *TaskRun) *TaskStatusManager {
stopReclaiming := make(chan struct{})
reclaimingDone := make(chan struct{})
tsm := &TaskStatusManager{
task: task,
takenUntil: task.TaskClaimResponse.TakenUntil,
status: task.TaskClaimResponse.Status,
statusChangeListeners: map[*TaskStatusChangeListener]bool{},
stopReclaiming: stopReclaiming,
reclaimingDone: reclaimingDone,
}
// Reclaiming Tasks
// ----------------
// When the worker has claimed a task, it's said to have a claim to a given
// `taskId`/`runId`. This claim has an expiration, see the `takenUntil`
// property in the _task status structure_ returned from `tcqueue.ClaimTask`
// and `tcqueue.ReclaimTask`. A worker must call `tcqueue.ReclaimTask` before
// the claim denoted in `takenUntil` expires. It's recommended that this
// attempted a few minutes prior to expiration, to allow for clock drift.
go func() {
defer close(reclaimingDone)
for {
var waitTimeUntilReclaim time.Duration
if reclaimEvery5Seconds {
// Reclaim in 5 seconds...
waitTimeUntilReclaim = time.Second * 5
} else {
// Reclaim 3 mins before current claim expires...
takenUntil := time.Time(tsm.TakenUntil())
reclaimTime := takenUntil.Add(time.Minute * -3)
// Round(0) forces wall time calculation instead of monotonic time in case machine slept etc
waitTimeUntilReclaim = time.Until(reclaimTime.Round(0))
log.Printf("Reclaiming task %v at %v", task.TaskID, reclaimTime)
log.Printf("Current task claim expires at %v", takenUntil)
// sanity check - only set an alarm, if wait time >= 30s, so we can't hammer queue in production
if waitTimeUntilReclaim.Seconds() < 30 {
log.Printf("WARNING: This is less than 30 seconds away. NOT setting a reclaim timer for task %v to avoid hammering queue if this is a bug.", task.TaskID)
return
}
}
log.Printf("Reclaiming task %v in %v", task.TaskID, waitTimeUntilReclaim)
select {
case <-stopReclaiming:
return
case <-time.After(waitTimeUntilReclaim):
log.Printf("About to reclaim task %v...", task.TaskID)
err := tsm.reclaim()
if err != nil {
log.Printf("ERROR: Encountered exception when reclaiming task %v - giving up retrying: %v", task.TaskID, err)
return
}
log.Printf("Successfully reclaimed task %v", task.TaskID)
}
}
}()
return tsm
}
// stopReclaims() must be called when tsm.Lock() is held by caller
func (tsm *TaskStatusManager) stopReclaims() {
if !tsm.finishedReclaiming {
close(tsm.stopReclaiming)
<-tsm.reclaimingDone
tsm.finishedReclaiming = true
}
}