forked from decred/dcrdata
-
Notifications
You must be signed in to change notification settings - Fork 1
/
websocket.go
164 lines (145 loc) · 4.35 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.
package main
import (
"sync"
apitypes "github.com/decred/dcrdata/dcrdataapi"
)
// WebSocketMessage represents the JSON object used to send and received typed
// messages to the web client.
type WebSocketMessage struct {
EventId string `json:"event"`
Messsage string `json:"message"`
}
// Event type field for an SSE event
var eventIDs = map[hubSignal]string{
sigNewBlock: "newblock",
sigMempoolFeeInfoUpdate: "mempoolsstxfeeinfo",
sigPingAndUserCount: "ping",
}
// WebBlockInfo represents the JSON object used to send block data and stake
// info to the web client
type WebBlockInfo struct {
BlockDataBasic *apitypes.BlockExplorerBasic `json:"block"`
StakeInfoExt *apitypes.StakeInfoExtendedEstimates `json:"stake"`
}
// WebsocketHub and its event loop manage all websocket client connections.
// WebsocketHub is responsible for closing all connections registered with it.
// If the event loop is running, calling (*WebsocketHub).Stop() will handle it.
type WebsocketHub struct {
sync.RWMutex
clients map[*hubSpoke]struct{}
Register chan *hubSpoke
Unregister chan *hubSpoke
HubRelay chan hubSignal
NewBlockInfo chan WebBlockInfo
NewBlockSummary chan apitypes.BlockDataBasic
NewStakeSummary chan apitypes.StakeInfoExtendedEstimates
quitWSHandler chan struct{}
}
type hubSignal int
type hubSpoke chan hubSignal
const (
sigNewBlock hubSignal = iota
sigMempoolFeeInfoUpdate
sigPingAndUserCount
)
// NewWebsocketHub creates a new WebsocketHub
func NewWebsocketHub() *WebsocketHub {
return &WebsocketHub{
clients: make(map[*hubSpoke]struct{}),
Register: make(chan *hubSpoke),
Unregister: make(chan *hubSpoke),
HubRelay: make(chan hubSignal),
NewBlockInfo: make(chan WebBlockInfo),
quitWSHandler: make(chan struct{}),
}
}
// NumClients returns the number of clients connected to the websocket hub
func (wsh *WebsocketHub) NumClients() int {
return len(wsh.clients)
}
// RegisterClient registers a websocket connection with the hub.
func (wsh *WebsocketHub) RegisterClient(c *hubSpoke) {
log.Debug("Registering new websocket client")
wsh.Register <- c
}
// registerClient should only be called from the run loop
func (wsh *WebsocketHub) registerClient(c *hubSpoke) {
wsh.clients[c] = struct{}{}
}
// UnregisterClient unregisters the input websocket connection via the main
// run() loop. This call will block if the run() loop is not running.
func (wsh *WebsocketHub) UnregisterClient(c *hubSpoke) {
wsh.Unregister <- c
}
// unregisterClient should only be called from the loop in run().
func (wsh *WebsocketHub) unregisterClient(c *hubSpoke) {
if _, ok := wsh.clients[c]; !ok {
// unknown client, do not close channel
log.Warnf("unknown client")
return
}
delete(wsh.clients, c)
// Close the channel, but make sure the client didn't do it
safeClose(*c)
}
func safeClose(cc hubSpoke) {
select {
case _, ok := <-cc:
if !ok {
log.Debug("Channel already closed!")
return
}
default:
}
close(cc)
}
// Stop kills the run() loop and unregisteres all clients (connections).
func (wsh *WebsocketHub) Stop() {
// end the run() loop, allowing in progress operations to complete
wsh.quitWSHandler <- struct{}{}
// unregister all clients
for client := range wsh.clients {
wsh.unregisterClient(client)
}
}
func (wsh *WebsocketHub) run() {
log.Info("Starting WebsocketHub run loop.")
for {
events:
select {
case hubSignal := <-wsh.HubRelay:
switch hubSignal {
case sigNewBlock:
log.Infof("Signaling new block to %d clients.", len(wsh.clients))
case sigMempoolFeeInfoUpdate:
log.Infof("Signaling mempool info update to %d clients.", len(wsh.clients))
case sigPingAndUserCount:
log.Tracef("Signaling ping/user count to %d clients.", len(wsh.clients))
default:
log.Errorf("Unknown hub signal: %v", hubSignal)
break events
}
for client := range wsh.clients {
// signal or unregister the client
select {
case *client <- hubSignal:
default:
go wsh.unregisterClient(client)
}
}
case c := <-wsh.Register:
wsh.registerClient(c)
case c := <-wsh.Unregister:
wsh.unregisterClient(c)
case _, ok := <-wsh.quitWSHandler:
if !ok {
log.Error("close channel already closed. This should not happen.")
return
}
close(wsh.quitWSHandler)
return
}
}
}