-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream.go
162 lines (139 loc) · 3.41 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package broadcast
import (
"time"
)
// Stream ...
type Stream struct {
// Enables replaying of eventlog to newly added subscribers
AutoReplay bool
log EventLog
MaxInactivity time.Duration
stats chan chan int
subscribers []*Subscriber
register chan *Subscriber
deregister chan *Subscriber
replay chan *Connection
event chan *Event
quit chan bool
closed bool
}
// StreamRegistration ...
type StreamRegistration struct {
id string
stream *Stream
}
// newStream returns a new stream
func newStream(bufsize int) *Stream {
s := &Stream{
AutoReplay: true,
MaxInactivity: DefaultMaxInactivity,
log: make(EventLog, 0),
subscribers: make([]*Subscriber, 0),
register: make(chan *Subscriber),
deregister: make(chan *Subscriber),
replay: make(chan *Connection),
event: make(chan *Event, bufsize),
quit: make(chan bool),
}
s.run()
return s
}
func (str *Stream) run() {
go func(str *Stream) {
for {
select {
// Add new subscriber
case subscriber := <-str.register:
if str.AutoReplay {
subscriber.replay = str.replay
}
str.subscribers = append(str.subscribers, subscriber)
// Remove closed subscriber
case subscriber := <-str.deregister:
i := str.getSubscriberIndex(subscriber)
if i != -1 {
str.removeSubscriber(i)
}
// Publish event to subscribers
case event := <-str.event:
if str.AutoReplay {
str.log.Add(event)
}
for i := range str.subscribers {
str.subscribers[i].Broadcast(event)
}
// Replay events to new connections
case conn := <-str.replay:
str.log.Replay(conn)
// Kill stream if there are no users and no activity on the stream
case <-time.After(str.MaxInactivity):
if !str.hasActiveSubscribers() {
str.cleanup()
return
}
// Shutdown if the server closes
case <-str.quit:
// remove connections
str.removeAllSubscribers()
str.cleanup()
return
}
}
}(str)
}
func (str *Stream) close() {
if str.closed {
return
}
str.quit <- true
}
func (str *Stream) cleanup() {
close(str.event)
close(str.register)
close(str.deregister)
close(str.quit)
str.closed = true
}
func (str *Stream) getSubscriber(id string) *Subscriber {
for i := range str.subscribers {
if str.subscribers[i].id == id {
return str.subscribers[i]
}
}
return nil
}
func (str *Stream) getSubscriberIndex(sub *Subscriber) int {
for i := range str.subscribers {
if str.subscribers[i].id == sub.id {
return i
}
}
return -1
}
// addSubscriber will register a subscriber on a stream
func (str *Stream) addSubscriber(sub *Subscriber) {
sub.quit = str.deregister
sub.replay = str.replay
str.register <- sub
}
func (str *Stream) removeSubscriber(i int) {
str.subscribers[i].DisconnectAll()
str.subscribers = append(str.subscribers[:i], str.subscribers[i+1:]...)
}
func (str *Stream) removeAllSubscribers() {
for i := range str.subscribers {
str.subscribers[i].DisconnectAll()
}
str.subscribers = str.subscribers[:0]
}
func (str *Stream) hasActiveSubscribers() bool {
for i := range str.subscribers {
if str.subscribers[i].HasConnections() {
return true
}
}
return false
}