-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
136 lines (99 loc) · 2.14 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
// Job alias
type Job = func() (string, error)
// Result contains the index of a task and its result
type Result struct {
index int
result string
retries int
}
func main() {
fmt.Println("this is not control group!")
start := time.Now()
var tasks []Job
for i := 0; i < 20; i++ {
tasks = append(tasks, coding)
}
// ConcurrentRetry(tasks, workers, attempts)
//
results := ConcurrentRetry(tasks, 2, 1)
for r := range results {
fmt.Println("Result received from thread:", r)
}
elapsed := time.Since(start)
fmt.Println("Program finished in", elapsed)
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// Fake coding task
func coding() (string, error) {
min := -5
max := 5
randVal := rand.Intn(max-min+1) + min
time.Sleep(time.Millisecond)
if randVal < 0 {
return "Error!", errors.New("Error ocurred")
}
return "Success!", nil
}
func worker(id int, threads <-chan Job, results chan<- Result, retry int, wg *sync.WaitGroup) {
//for each job in the threads channel
for job := range threads {
var r Result
for i := 1; i <= retry; i++ {
res, err := job()
// save the results even if an error occurred
r = Result{
index: id,
result: res,
retries: i,
}
// if no error ocurred, break out of loop
if err == nil {
break
}
}
results <- r
}
wg.Done()
}
// ConcurrentRetry does stuff and things
func ConcurrentRetry(tasks []Job, concurrent, retry int) <-chan Result {
threads := make(chan Job, len(tasks))
results := make(chan Result, len(tasks))
var wg sync.WaitGroup
go func() {
// Start sending jobs to the thread channel
for _, task := range tasks {
threads <- task
}
//fmt.Println("len", len(threads))
//fmt.Println("cap", cap(threads))
// Setup all the workers
for i := 1; i <= concurrent; i++ {
//fmt.Println("wow", i)
wg.Add(1)
go worker(i, threads, results, retry, &wg)
}
// loop manager
for {
if len(threads) == 0 {
close(threads)
//fmt.Println("broke out of loop")
break
}
}
wg.Wait()
//close(threads)
close(results)
}()
return results
}