-
Notifications
You must be signed in to change notification settings - Fork 0
/
uq.go
126 lines (102 loc) · 2.88 KB
/
uq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package uniqueue
import (
"errors"
"sync"
)
// UQ is a uniqueue queue. It guarantees that a value is only once in the queue. The queue is thread safe.
// The unique constraint can be temporarily disabled to add multiple instances of the same value to the queue.
type UQ[T comparable] struct {
back chan T
queue chan T
front chan T
constraints map[T]*constraint
mu sync.Mutex
AutoRemoveConstraint bool // if true, the constraint will be removed when the value is popped from the queue.
}
type constraint struct {
count uint // number of elements in the queue
disabled bool
}
func NewUQ[T comparable](size uint) *UQ[T] {
u := &UQ[T]{
back: make(chan T),
queue: make(chan T, size),
front: make(chan T),
constraints: map[T]*constraint{},
}
go u.linkChannels()
return u
}
// Get the back of the queue, this channel can be used to write values to.
func (u *UQ[T]) Back() chan<- T {
return u.back
}
// Get the front of the queue, this channel can be used to read values from.
func (u *UQ[T]) Front() <-chan T {
return u.front
}
// Ignores the constraint for a value v once, when the value is added to the queue again, the constraint is enabled again.
func (u *UQ[T]) IgnoreConstraintFor(v T) {
u.mu.Lock()
defer u.mu.Unlock()
if _, ok := u.constraints[v]; !ok {
u.constraints[v] = &constraint{}
}
u.constraints[v].disabled = true
}
// Manually add a constraint to the queue, only use in special cases when you want to prevent certain values to enter the queue.
func (u *UQ[T]) AddConstraint(v T) error {
u.mu.Lock()
defer u.mu.Unlock()
if _, ok := u.constraints[v]; !ok {
u.constraints[v] = &constraint{
count: 1,
disabled: false,
}
return nil
} else {
if u.constraints[v].disabled {
u.constraints[v].count += 1
u.constraints[v].disabled = false
return nil
}
}
return errors.New("Already existing constraint prevents adding new constraint")
}
// Manually remove a constraint from the queue, this needs to be called when AutoRemoveConstraint is set to false. Useful when you want to remove the constraint only when a worker using the queue is finished processing the value.
func (u *UQ[T]) RemoveConstraint(v T) {
u.mu.Lock()
defer u.mu.Unlock()
if _, ok := u.constraints[v]; ok {
u.constraints[v].count -= 1
if u.constraints[v].count == 0 {
delete(u.constraints, v)
}
}
}
func (u *UQ[T]) linkChannels() {
wg := &sync.WaitGroup{}
wg.Add(2)
go u.shiftToFront(wg)
go u.readFromBack(wg)
wg.Wait()
}
func (u *UQ[T]) shiftToFront(wg *sync.WaitGroup) {
for v := range u.queue {
u.front <- v
if u.AutoRemoveConstraint {
u.RemoveConstraint(v)
}
}
close(u.front)
wg.Done()
}
func (u *UQ[T]) readFromBack(wg *sync.WaitGroup) {
for v := range u.back {
if err := u.AddConstraint(v); err == nil {
u.queue <- v
}
}
close(u.queue)
wg.Done()
}