diff --git a/internal/events/mock_wss_server/server.go b/internal/events/mock_wss_server/server.go index c350c9e8..04b43e69 100644 --- a/internal/events/mock_wss_server/server.go +++ b/internal/events/mock_wss_server/server.go @@ -33,11 +33,13 @@ var debug = false var wsServers = [2]*WebsocketServer{} type WebsocketServer struct { - serverId int // Int representing the ID of the server (0, 1, 2, ...) - websocketId string // UUID of the websocket. Used for subscribing via EventSub - connectionUrl string // URL used to connect to the websocket. Used for reconnect messages - connections []*WebsocketConnection // Current clients connected to this websocket - deactivatedStatus bool // Boolean used for preventing connections/messages during deactivation; Used for reconnect testing + serverId int // Int representing the ID of the server (0, 1, 2, ...) + websocketId string // UUID of the websocket. Used for subscribing via EventSub + connectionUrl string // URL used to connect to the websocket. Used for reconnect messages + connections []*WebsocketConnection // Current clients connected to this websocket + deactivatedStatus bool // Boolean used for preventing connections/messages during deactivation; Used for reconnect testing + reconnectTestTimeout int // Timeout for reconnect testing after first client connects; 0 if reconnect testing not enabled. + firstClientConnected bool // Whether or not the first client has connected (used for reconnect testing) } type WebsocketConnection struct { @@ -71,7 +73,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { // Connection sucessful. WebSocket is open. serverId, _ := strconv.Atoi(r.Context().Value("serverId").(string)) - wsSrv := *wsServers[serverId] + wsSrv := wsServers[serverId] // Or is it? Check for websocket set to deactivated (due to reconnect), and kick them out if so if wsSrv.deactivatedStatus { @@ -80,6 +82,22 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { return } + // Activate reconnect testing upon first client connection + if !wsSrv.firstClientConnected { + wsSrv.firstClientConnected = true + + if wsSrv.reconnectTestTimeout != 0 { + go func() { + log.Printf("First client connected; Reconnect testing enabled. Notices will be sent in %d seconds.", wsSrv.reconnectTestTimeout) + + select { + case <-time.After(time.Second * time.Duration(wsSrv.reconnectTestTimeout)): + activateReconnectTest(r.Context()) + } + }() + } + } + // TODO: Decline websocket if it reached 100 connections from the same application access token // RFC3339Nano = "2022-10-04T12:38:15.548912638Z07" ; This is used by Twitch in production @@ -94,7 +112,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { closed: false, } wsSrv.connections = append(wsSrv.connections, wc) - printConnections(wsSrv) + printConnections(wsSrv.serverId) // Send "websocket_welcome" message welcomeMsg, _ := json.Marshal( @@ -141,7 +159,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { case <-pingTicker.C: err := wc.SendMessage(websocket.PingMessage, []byte{}) if err != nil { - onCloseConnection(*wsServers[1], wc) + onCloseConnection(wsSrv.serverId, wc) } } } @@ -168,7 +186,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { ) err := wc.SendMessage(websocket.TextMessage, keepAliveMsg) if err != nil { - onCloseConnection(*wsServers[1], wc) + onCloseConnection(wsSrv.serverId, wc) } if debug { @@ -184,7 +202,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { mt, message, err := conn.ReadMessage() if err != nil { log.Println("read:", err) - onCloseConnection(wsSrv, wc) + onCloseConnection(wsSrv.serverId, wc) break } if debug { @@ -199,11 +217,15 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) { } } -func onCloseConnection(wsSrv WebsocketServer, wc *WebsocketConnection) { +func onCloseConnection(serverId int, wc *WebsocketConnection) { // Close ping loop chan + if !wc.closed { + close(wc.pingLoopChan) + close(wc.kaLoopChan) + } wc.closed = true - close(wc.pingLoopChan) - close(wc.kaLoopChan) + + wsSrv := wsServers[serverId] // Remove from list c := 0 @@ -216,21 +238,22 @@ func onCloseConnection(wsSrv WebsocketServer, wc *WebsocketConnection) { } wsSrv.connections = append(wsSrv.connections[:c], wsSrv.connections[c+1:]...) - printConnections(wsSrv) + printConnections(wsSrv.serverId) } -func printConnections(wsSrv WebsocketServer) { +func printConnections(serverId int) { currentConnections := "" + wsSrv := wsServers[serverId] for _, s := range wsSrv.connections { currentConnections += s.clientId + ", " } if currentConnections != "" { currentConnections = string(currentConnections[:len(currentConnections)-2]) } - log.Printf("[Server %v] Connections: (%d) [ %s ]", wsSrv.serverId, len(wsSrv.connections), currentConnections) + log.Printf("[Server %v] Connections: (%d) [ %s ]", serverId, len(wsSrv.connections), currentConnections) } -func activateReconnectTest(server http.Server, ctx context.Context) { +func activateReconnectTest(ctx context.Context) { timer := 30 // 30 seconds, as used by Twitch serverId, _ := strconv.Atoi(ctx.Value("serverId").(string)) @@ -248,8 +271,11 @@ func activateReconnectTest(server http.Server, ctx context.Context) { // Stop processing new messages wsSrv.deactivatedStatus = true // This server wsAltSrv.deactivatedStatus = false // Other server; We gotta turn it on to accept connections and whatnot + log.Printf("Server \"not accepting connections\" status: [Server 0: %v, Server 1: %v]", wsServers[0].deactivatedStatus, wsServers[1].deactivatedStatus) - log.Printf("Connections: %v", len(wsSrv.connections)) + if debug { + log.Printf("Connections at time of close: %v", len(wsSrv.connections)) + } // Send reconnect notices for _, c := range wsSrv.connections { @@ -281,8 +307,7 @@ func activateReconnectTest(server http.Server, ctx context.Context) { } log.Printf("Reconnect notices sent for server %v", serverId) - log.Printf("Server \"not accepting connections\" status: [Server 0: %v, Server 1: %v]", wsServers[0].deactivatedStatus, wsServers[1].deactivatedStatus) - log.Printf("Use this URL for connections: %v", wsAltSrv.connectionUrl) + log.Printf("Use this new URL for connections: %v", wsAltSrv.connectionUrl) // TODO: Transfer subscriptions to the other websocket server. @@ -338,8 +363,24 @@ func StartServer(port int, enableDebug bool, reconnectTestTimer int, sslEnabled wsSrv2Url = fmt.Sprintf("wss://localhost:%v/eventsub", port+1) } - wsServers[0] = &WebsocketServer{0, util.RandomGUID(), wsSrv1Url, []*WebsocketConnection{}, false} - wsServers[1] = &WebsocketServer{1, util.RandomGUID(), wsSrv2Url, []*WebsocketConnection{}, true} + wsServers[0] = &WebsocketServer{ + serverId: 0, + websocketId: util.RandomGUID(), + connectionUrl: wsSrv1Url, + connections: []*WebsocketConnection{}, + deactivatedStatus: false, + reconnectTestTimeout: reconnectTestTimer, + firstClientConnected: false, + } + wsServers[1] = &WebsocketServer{ + serverId: 1, + websocketId: util.RandomGUID(), + connectionUrl: wsSrv2Url, + connections: []*WebsocketConnection{}, + deactivatedStatus: true, // 2nd server is deactivated by default. Will reactivate for reconnect testing. + reconnectTestTimeout: 0, // No reconnect testing + firstClientConnected: false, + } RegisterHandlers(m) @@ -379,17 +420,6 @@ func StartIndividualServer(port int, reconnectTestTimer int, sslEnabled bool, m go func() { log.Printf("Mock EventSub websocket server started on port %d", port) - go func() { - if reconnectTestTimer != 0 { - log.Printf("Reconnect testing enabled. Will be sent in %d seconds.", reconnectTestTimer) - - select { - case <-time.After(time.Second * time.Duration(reconnectTestTimer)): - activateReconnectTest(s, ctx) - } - } - }() - if sslEnabled { // Open HTTP server with HTTPS support home, _ := util.GetApplicationDir() crtFile := filepath.Join(home, "localhost.crt") @@ -460,17 +490,17 @@ window.addEventListener("load", function(evt) { ws = new WebSocket(wsUrl); ws.onopen = function(evt) { - print("OPEN"); + print("OPEN - " + ws.url); } ws.onclose = function(evt) { - print("CLOSE"); + print("CLOSE - " + ws.url); ws = null; } ws.onmessage = function(evt) { - print("RECEIVED: " + evt.data); + print("[RECEIVED / " + new Date().toISOString() + "]: " + evt.data); } ws.onerror = function(evt) { - print("ERROR: " + evt.data); + print("[ERROR / " + new Date().toISOString() + "]: " + evt.data); console.error("ERROR", evt); } return false; @@ -490,6 +520,9 @@ window.addEventListener("load", function(evt) { ws.close(); return false; }; + document.getElementById("clear").onclick = function(evt) { + output.innerHTML = ""; + } }); @@ -505,7 +538,8 @@ You can change the message and send multiple times.

- +

+