forked from chrissnell/crabby
-
Notifications
You must be signed in to change notification settings - Fork 0
/
jobs.go
167 lines (133 loc) · 3.78 KB
/
jobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
// Job holds a single job to be run
type Job struct {
Name string `yaml:"name"`
URL string `yaml:"url"`
Method string `yaml:"method"`
Type string `yaml:"type"`
Interval uint16 `yaml:"interval"`
Cookies []Cookie `yaml:"cookies,omitempty"`
Tags map[string]string `yaml:"tags,omitempty"`
}
// JobConfig holds a list of jobs to be run
type JobConfig struct {
Jobs []Job `yaml:"jobs"`
}
// JobRunner holds channels and state related to running Jobs
type JobRunner struct {
ctx context.Context
JobChan chan<- Job
WG sync.WaitGroup
Client *http.Client
Storage *Storage
}
// NewJobRunner returns as JobRunner
func NewJobRunner(ctx context.Context) *JobRunner {
jr := JobRunner{
ctx: ctx,
}
jr.JobChan = make(chan Job, 10)
return &jr
}
// runJob executes the job on a Ticker interval
func (jr *JobRunner) runJob(wg *sync.WaitGroup, j Job, seleniumServer string, storage *Storage, client *http.Client) {
jobTicker := time.NewTicker(time.Duration(j.Interval) * time.Second)
wg.Add(1)
defer wg.Done()
for {
select {
case <-jobTicker.C:
switch j.Type {
case "selenium":
RunSeleniumTest(j, seleniumServer, storage)
case "simple":
go RunSimpleTest(jr.ctx, j, storage, client)
default:
// We run Selenium tests by default
RunSeleniumTest(j, seleniumServer, storage)
}
case <-jr.ctx.Done():
log.Println("Cancellation request received. Cancelling job runner.")
return
}
}
}
// makeMetric creates a Metric for a given timing name and value
func (j *Job) makeMetric(timing string, value float64) Metric {
tags := make(map[string]string)
if len(j.Tags) != 0 {
tags = j.Tags
}
m := Metric{
Job: j.Name,
URL: j.URL,
Timing: timing,
Value: value,
Timestamp: time.Now(),
Tags: tags,
}
return m
}
// makeEvent creates an Event from a given status code
func (j *Job) makeEvent(status int) Event {
e := Event{
Name: j.Name,
ServerStatus: status,
Timestamp: time.Now(),
Tags: j.Tags,
}
// If our event had no (nil) tags, initialze the tags map so that
// we don't panic if tags are added later on.
if len(e.Tags) == 0 {
e.Tags = make(map[string]string)
}
return e
}
// StartJobs launches all configured jobs
func StartJobs(ctx context.Context, wg *sync.WaitGroup, c *Config, storage *Storage, client *http.Client) {
var jobs []Job
jobs = c.Jobs
jr := NewJobRunner(ctx)
seleniumServer := c.Selenium.URL
rand.Seed(time.Now().Unix())
for _, j := range jobs {
// Merge the global tags with the per-job tags. Per-job tags take precidence.
j.Tags = mergeTags(j.Tags, c.General.Tags)
// If we've been provided with an offset for staggering jobs, sleep for a random
// time interval (where: 0 < sleepDur < offset) before starting that job's timer
if c.Selenium.JobStaggerOffset > 0 {
sleepDur := time.Duration(rand.Int31n(c.Selenium.JobStaggerOffset*1000)) * time.Millisecond
fmt.Println("Sleeping for", sleepDur, "before starting next job")
time.Sleep(sleepDur)
}
log.Println("Launching job -> ", j.URL)
go jr.runJob(wg, j, seleniumServer, storage, client)
}
}
func mergeTags(jobTags map[string]string, globalTags map[string]string) map[string]string {
mergedTags := make(map[string]string)
// If we don't have any global tags or job tags, just return an empty map
if len(jobTags) == 0 && len(globalTags) == 0 {
return mergedTags
}
for k, v := range jobTags {
mergedTags[k] = v
}
for k, v := range globalTags {
// Add the global tags to the merged tags, but only if they weren't overriden by a job tag
_, present := mergedTags[k]
if !present {
mergedTags[k] = v
}
}
return mergedTags
}