Skip to content

Commit

Permalink
Merge pull request #183 from twitchdev/eventsocket-fix-181-182
Browse files Browse the repository at this point in the history
EventSocket fixes for #181 #182
  • Loading branch information
Xemdo authored Oct 21, 2022
2 parents c4fe31a + 9a83f75 commit 0e6f3b1
Showing 1 changed file with 72 additions and 38 deletions.
110 changes: 72 additions & 38 deletions internal/events/mock_wss_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ var debug = false
var wsServers = [2]*WebsocketServer{}

type WebsocketServer struct {
serverId int // Int representing the ID of the server (0, 1, 2, ...)
websocketId string // UUID of the websocket. Used for subscribing via EventSub
connectionUrl string // URL used to connect to the websocket. Used for reconnect messages
connections []*WebsocketConnection // Current clients connected to this websocket
deactivatedStatus bool // Boolean used for preventing connections/messages during deactivation; Used for reconnect testing
serverId int // Int representing the ID of the server (0, 1, 2, ...)
websocketId string // UUID of the websocket. Used for subscribing via EventSub
connectionUrl string // URL used to connect to the websocket. Used for reconnect messages
connections []*WebsocketConnection // Current clients connected to this websocket
deactivatedStatus bool // Boolean used for preventing connections/messages during deactivation; Used for reconnect testing
reconnectTestTimeout int // Timeout for reconnect testing after first client connects; 0 if reconnect testing not enabled.
firstClientConnected bool // Whether or not the first client has connected (used for reconnect testing)
}

type WebsocketConnection struct {
Expand Down Expand Up @@ -71,7 +73,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
// Connection sucessful. WebSocket is open.

serverId, _ := strconv.Atoi(r.Context().Value("serverId").(string))
wsSrv := *wsServers[serverId]
wsSrv := wsServers[serverId]

// Or is it? Check for websocket set to deactivated (due to reconnect), and kick them out if so
if wsSrv.deactivatedStatus {
Expand All @@ -80,6 +82,22 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
return
}

// Activate reconnect testing upon first client connection
if !wsSrv.firstClientConnected {
wsSrv.firstClientConnected = true

if wsSrv.reconnectTestTimeout != 0 {
go func() {
log.Printf("First client connected; Reconnect testing enabled. Notices will be sent in %d seconds.", wsSrv.reconnectTestTimeout)

select {
case <-time.After(time.Second * time.Duration(wsSrv.reconnectTestTimeout)):
activateReconnectTest(r.Context())
}
}()
}
}

// TODO: Decline websocket if it reached 100 connections from the same application access token

// RFC3339Nano = "2022-10-04T12:38:15.548912638Z07" ; This is used by Twitch in production
Expand All @@ -94,7 +112,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
closed: false,
}
wsSrv.connections = append(wsSrv.connections, wc)
printConnections(wsSrv)
printConnections(wsSrv.serverId)

// Send "websocket_welcome" message
welcomeMsg, _ := json.Marshal(
Expand Down Expand Up @@ -141,7 +159,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
case <-pingTicker.C:
err := wc.SendMessage(websocket.PingMessage, []byte{})
if err != nil {
onCloseConnection(*wsServers[1], wc)
onCloseConnection(wsSrv.serverId, wc)
}
}
}
Expand All @@ -168,7 +186,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
)
err := wc.SendMessage(websocket.TextMessage, keepAliveMsg)
if err != nil {
onCloseConnection(*wsServers[1], wc)
onCloseConnection(wsSrv.serverId, wc)
}

if debug {
Expand All @@ -184,7 +202,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
mt, message, err := conn.ReadMessage()
if err != nil {
log.Println("read:", err)
onCloseConnection(wsSrv, wc)
onCloseConnection(wsSrv.serverId, wc)
break
}
if debug {
Expand All @@ -199,11 +217,15 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
}
}

func onCloseConnection(wsSrv WebsocketServer, wc *WebsocketConnection) {
func onCloseConnection(serverId int, wc *WebsocketConnection) {
// Close ping loop chan
if !wc.closed {
close(wc.pingLoopChan)
close(wc.kaLoopChan)
}
wc.closed = true
close(wc.pingLoopChan)
close(wc.kaLoopChan)

wsSrv := wsServers[serverId]

// Remove from list
c := 0
Expand All @@ -216,21 +238,22 @@ func onCloseConnection(wsSrv WebsocketServer, wc *WebsocketConnection) {
}
wsSrv.connections = append(wsSrv.connections[:c], wsSrv.connections[c+1:]...)

printConnections(wsSrv)
printConnections(wsSrv.serverId)
}

func printConnections(wsSrv WebsocketServer) {
func printConnections(serverId int) {
currentConnections := ""
wsSrv := wsServers[serverId]
for _, s := range wsSrv.connections {
currentConnections += s.clientId + ", "
}
if currentConnections != "" {
currentConnections = string(currentConnections[:len(currentConnections)-2])
}
log.Printf("[Server %v] Connections: (%d) [ %s ]", wsSrv.serverId, len(wsSrv.connections), currentConnections)
log.Printf("[Server %v] Connections: (%d) [ %s ]", serverId, len(wsSrv.connections), currentConnections)
}

func activateReconnectTest(server http.Server, ctx context.Context) {
func activateReconnectTest(ctx context.Context) {
timer := 30 // 30 seconds, as used by Twitch

serverId, _ := strconv.Atoi(ctx.Value("serverId").(string))
Expand All @@ -248,8 +271,11 @@ func activateReconnectTest(server http.Server, ctx context.Context) {
// Stop processing new messages
wsSrv.deactivatedStatus = true // This server
wsAltSrv.deactivatedStatus = false // Other server; We gotta turn it on to accept connections and whatnot
log.Printf("Server \"not accepting connections\" status: [Server 0: %v, Server 1: %v]", wsServers[0].deactivatedStatus, wsServers[1].deactivatedStatus)

log.Printf("Connections: %v", len(wsSrv.connections))
if debug {
log.Printf("Connections at time of close: %v", len(wsSrv.connections))
}

// Send reconnect notices
for _, c := range wsSrv.connections {
Expand Down Expand Up @@ -281,8 +307,7 @@ func activateReconnectTest(server http.Server, ctx context.Context) {
}

log.Printf("Reconnect notices sent for server %v", serverId)
log.Printf("Server \"not accepting connections\" status: [Server 0: %v, Server 1: %v]", wsServers[0].deactivatedStatus, wsServers[1].deactivatedStatus)
log.Printf("Use this URL for connections: %v", wsAltSrv.connectionUrl)
log.Printf("Use this new URL for connections: %v", wsAltSrv.connectionUrl)

// TODO: Transfer subscriptions to the other websocket server.

Expand Down Expand Up @@ -338,8 +363,24 @@ func StartServer(port int, enableDebug bool, reconnectTestTimer int, sslEnabled
wsSrv2Url = fmt.Sprintf("wss://localhost:%v/eventsub", port+1)
}

wsServers[0] = &WebsocketServer{0, util.RandomGUID(), wsSrv1Url, []*WebsocketConnection{}, false}
wsServers[1] = &WebsocketServer{1, util.RandomGUID(), wsSrv2Url, []*WebsocketConnection{}, true}
wsServers[0] = &WebsocketServer{
serverId: 0,
websocketId: util.RandomGUID(),
connectionUrl: wsSrv1Url,
connections: []*WebsocketConnection{},
deactivatedStatus: false,
reconnectTestTimeout: reconnectTestTimer,
firstClientConnected: false,
}
wsServers[1] = &WebsocketServer{
serverId: 1,
websocketId: util.RandomGUID(),
connectionUrl: wsSrv2Url,
connections: []*WebsocketConnection{},
deactivatedStatus: true, // 2nd server is deactivated by default. Will reactivate for reconnect testing.
reconnectTestTimeout: 0, // No reconnect testing
firstClientConnected: false,
}

RegisterHandlers(m)

Expand Down Expand Up @@ -379,17 +420,6 @@ func StartIndividualServer(port int, reconnectTestTimer int, sslEnabled bool, m
go func() {
log.Printf("Mock EventSub websocket server started on port %d", port)

go func() {
if reconnectTestTimer != 0 {
log.Printf("Reconnect testing enabled. Will be sent in %d seconds.", reconnectTestTimer)

select {
case <-time.After(time.Second * time.Duration(reconnectTestTimer)):
activateReconnectTest(s, ctx)
}
}
}()

if sslEnabled { // Open HTTP server with HTTPS support
home, _ := util.GetApplicationDir()
crtFile := filepath.Join(home, "localhost.crt")
Expand Down Expand Up @@ -460,17 +490,17 @@ window.addEventListener("load", function(evt) {
ws = new WebSocket(wsUrl);
ws.onopen = function(evt) {
print("OPEN");
print("OPEN - " + ws.url);
}
ws.onclose = function(evt) {
print("CLOSE");
print("CLOSE - " + ws.url);
ws = null;
}
ws.onmessage = function(evt) {
print("RECEIVED: " + evt.data);
print("[RECEIVED / " + new Date().toISOString() + "]: " + evt.data);
}
ws.onerror = function(evt) {
print("ERROR: " + evt.data);
print("[ERROR / " + new Date().toISOString() + "]: " + evt.data);
console.error("ERROR", evt);
}
return false;
Expand All @@ -490,6 +520,9 @@ window.addEventListener("load", function(evt) {
ws.close();
return false;
};
document.getElementById("clear").onclick = function(evt) {
output.innerHTML = "";
}
});
</script>
</head>
Expand All @@ -505,7 +538,8 @@ You can change the message and send multiple times.
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
<button id="send">Send</button><br><br>
<button id="clear">Clear</button>
</form>
</td><td valign="top" width="50%">
<div id="output" style="max-height: 70vh;overflow-y: scroll;"></div>
Expand Down

0 comments on commit 0e6f3b1

Please sign in to comment.