-
Notifications
You must be signed in to change notification settings - Fork 1
/
broadcaster.go
131 lines (114 loc) · 2.97 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Package broadcaster allows broadcasting a value over many channels at once.
package broadcaster
import (
"sync"
"time"
)
const (
// Maximum time to try broadcasting to a subscriber.
defaultWait = time.Second
)
// Options allows specifying additional options when creating a broadcaster.
type Options struct {
// Time to wait on a subscriber's channel before giving up. Default is 1s.
WaitTime time.Duration
}
func (o *Options) waitTime() time.Duration {
if o == nil || o.WaitTime == 0 {
return defaultWait
}
return o.WaitTime
}
// Caster allows sending messages to many subscribed channels.
type Caster interface {
// Subscribe will return a channel that recieves broadcasted messages, along
// with a done channel that unsubscribes and closes the broadcast channel
// when closed. When unsubscribing, any messages currently pending will still
// still be attempting to send until after the WaitTime.
Subscribe(done <-chan struct{}) <-chan interface{}
// Cast sends a message to be broadcast to all subscribers.
Cast(msg interface{})
// Close closes all created channels. Further calls to Close or Cast will
// panic, and further subscriptions will return a closed channel.
Close()
}
type subber struct {
ch chan<- interface{}
wg sync.WaitGroup
}
type broadcaster struct {
subbers []*subber
cast chan interface{}
join, leave chan chan interface{}
finish chan struct{}
waitTime time.Duration
}
// New creates a new broadcaster that broadcasts a message to many subscribers.
// If multiple options are supplied, only the first one is considered.
func New(opts ...*Options) Caster {
var o *Options
if len(opts) > 0 {
o = opts[0]
}
b := &broadcaster{
cast: make(chan interface{}),
finish: make(chan struct{}),
join: make(chan chan interface{}),
leave: make(chan chan interface{}),
waitTime: o.waitTime(),
}
go b.run()
return b
}
func (b *broadcaster) trySend(s *subber, m interface{}) {
defer s.wg.Done()
select {
case <-time.After(b.waitTime):
case s.ch <- m:
}
}
func (b *broadcaster) run() {
for {
select {
case ch := <-b.join:
b.subbers = append(b.subbers, &subber{ch: ch})
case ch := <-b.leave:
for i, s := range b.subbers {
if s.ch == ch {
b.subbers = append(b.subbers[:i], b.subbers[i+1:]...)
go func(s *subber) { s.wg.Wait(); close(s.ch) }(s)
continue
}
}
case m := <-b.cast:
for _, s := range b.subbers {
s.wg.Add(1)
b.trySend(s, m)
}
case <-b.finish:
close(b.cast)
for _, s := range b.subbers {
go func(s *subber) { s.wg.Wait(); close(s.ch) }(s)
}
return
}
}
}
func (b *broadcaster) Subscribe(done <-chan struct{}) <-chan interface{} {
ch := make(chan interface{})
select {
case b.join <- ch:
go func() {
<-done
select {
case b.leave <- ch:
case <-b.finish:
}
}()
case <-b.finish:
close(ch)
}
return ch
}
func (b *broadcaster) Cast(msg interface{}) { b.cast <- msg }
func (b *broadcaster) Close() { close(b.finish) }