Skip to content

Commit

Permalink
Merge pull request #180 from twitchdev/eventsocket-ssl-fix
Browse files Browse the repository at this point in the history
EventSocket SSL Fix
  • Loading branch information
Xemdo authored Oct 20, 2022
2 parents f9d8ee0 + 653d316 commit c4fe31a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 43 deletions.
11 changes: 9 additions & 2 deletions cmd/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
gameID string
debug bool
wssReconnectTest int
sslEnabled bool
)

var eventCmd = &cobra.Command{
Expand Down Expand Up @@ -127,6 +128,7 @@ func init() {
// start-websocket-server flags
startWebsocketServerCmd.Flags().IntVarP(&port, "port", "p", 8080, "Defines the port that the mock EventSub websocket server will run on.")
startWebsocketServerCmd.Flags().BoolVar(&debug, "debug", false, "Set on/off for debug messages for the EventSub WebSocket server.")
startWebsocketServerCmd.Flags().BoolVar(&sslEnabled, "ssl", false, "Sets on/off for SSL. Recommended to keep 'false', as most testing does not require this.")
// TODO: This next flag is temporary, until I create a better way to test reconnecting.
startWebsocketServerCmd.Flags().IntVarP(&wssReconnectTest, "reconnect", "r", 0, "Used to test WebSocket Reconnect message. Sets delay (in seconds) from startup until the reconnect occurs.")
}
Expand Down Expand Up @@ -245,6 +247,11 @@ func verifyCmdRun(cmd *cobra.Command, args []string) {
}

func startWebsocketServerCmdRun(cmd *cobra.Command, args []string) {
log.Printf("Starting mock EventSub WebSocket servers on wss://localhost:%v and wss://localhost:%v", port, port+1)
mock_wss_server.StartServer(port, debug, wssReconnectTest)
wsStr := "ws"
if sslEnabled {
wsStr = "wss"
}

log.Printf("Starting mock EventSub WebSocket servers on %v://localhost:%v and %v://localhost:%v", wsStr, port, wsStr, port+1)
mock_wss_server.StartServer(port, debug, wssReconnectTest, sslEnabled)
}
18 changes: 18 additions & 0 deletions internal/events/mock_wss_server/message_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,21 @@ type ReconnectMessagePayloadSession struct { // <3>
ReconnectUrl string `json:"reconnect_url"`
ConnectedAt string `json:"connected_at"`
}

/* ** Keepalive message **
{ // <1>
"metadata": { // <MessageMetadata>
"message_id": "84c1e79a-2a4b-4c13-ba0b-4312293e9308",
"message_type": "session_keepalive",
"message_timestamp": "2019-11-16T10:11:12.123Z"
},
"payload": {} // struct{}
}
*/

type KeepaliveMessage struct { // <1>
Metadata MessageMetadata `json:"metadata"`
Payload KeepaliveMessagePayload `json:"payload"`
}

type KeepaliveMessagePayload struct{}
169 changes: 128 additions & 41 deletions internal/events/mock_wss_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os/signal"
"path/filepath"
"strconv"
"sync"
"text/template"
"time"

Expand All @@ -25,26 +26,34 @@ import (
* This is a const for now, but it may need to be a flag in the future. */
const MINIMUM_MESSAGE_FREQUENCY_SECONDS = 10

//var connectionURL string

var upgrader = websocket.Upgrader{}
var debug = false

// List of websocket servers. Limited to 2 for now, as allowing more would require rewriting reconnect stuff.
var wsServers = [2]*WebsocketServer{}

type WebsocketServer struct {
serverId int // Int representing the ID of the server (0, 1, 2, ...)
websocketId string // UUID of the websocket. Used for subscribing via EventSub
connectionUrl string // URL used to connect to the websocket. Used for reconnect messages
connections []WebsocketConnection // Current clients connected to this websocket
deactivatedStatus bool // Boolean used for preventing connections/messages during deactivation; Used for reconnect testing
serverId int // Int representing the ID of the server (0, 1, 2, ...)
websocketId string // UUID of the websocket. Used for subscribing via EventSub
connectionUrl string // URL used to connect to the websocket. Used for reconnect messages
connections []*WebsocketConnection // Current clients connected to this websocket
deactivatedStatus bool // Boolean used for preventing connections/messages during deactivation; Used for reconnect testing
}

type WebsocketConnection struct {
clientId string
Conn *websocket.Conn
mu sync.Mutex
connectedAtTimestamp string
pingLoopChan chan struct{}
kaLoopChan chan struct{}
closed bool
}

func (wc *WebsocketConnection) SendMessage(messageType int, data []byte) error {
wc.mu.Lock()
defer wc.mu.Unlock()
return wc.Conn.WriteMessage(messageType, data)
}

func eventsubHandle(w http.ResponseWriter, r *http.Request) {
Expand All @@ -62,7 +71,7 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
// Connection sucessful. WebSocket is open.

serverId, _ := strconv.Atoi(r.Context().Value("serverId").(string))
wsSrv := wsServers[serverId]
wsSrv := *wsServers[serverId]

// Or is it? Check for websocket set to deactivated (due to reconnect), and kick them out if so
if wsSrv.deactivatedStatus {
Expand All @@ -78,15 +87,14 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
conn.SetReadDeadline(time.Now().Add(time.Second * MINIMUM_MESSAGE_FREQUENCY_SECONDS))

// Add to websocket connection list.
wsSrv.connections = append(
wsSrv.connections,
WebsocketConnection{
clientId: util.RandomGUID(),
Conn: conn,
connectedAtTimestamp: connectedAtTimestamp,
},
)
printConnections(*wsSrv)
wc := &WebsocketConnection{
clientId: util.RandomGUID(),
Conn: conn,
connectedAtTimestamp: connectedAtTimestamp,
closed: false,
}
wsSrv.connections = append(wsSrv.connections, wc)
printConnections(wsSrv)

// Send "websocket_welcome" message
welcomeMsg, _ := json.Marshal(
Expand All @@ -107,25 +115,82 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
},
},
)
conn.WriteMessage(1, welcomeMsg)

wc.SendMessage(websocket.TextMessage, welcomeMsg)
if debug {
log.Printf("[DEBUG] Write: %s", welcomeMsg)
}

// TODO: Look to implement a way to shut off pings. This would be used specifically for testing the timeout feature.
// Set up ping/pong handling
pingTicker := time.NewTicker(5 * time.Second)
wc.pingLoopChan = make(chan struct{}) // Also used for keepalive
go func() {
// Set pong handler
// Weirdly, pongs are not seen as messages read by conn.ReadMessage, so we have to reset the deadline manually
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(time.Second * MINIMUM_MESSAGE_FREQUENCY_SECONDS))
return nil
})

// Ping loop
for {
select {
case <-wc.pingLoopChan:
pingTicker.Stop()
return
case <-pingTicker.C:
err := wc.SendMessage(websocket.PingMessage, []byte{})
if err != nil {
onCloseConnection(*wsServers[1], wc)
}
}
}
}()

// Set up keepalive loop
kaTicker := time.NewTicker(10 * time.Second)
wc.kaLoopChan = make(chan struct{})
go func() {
for {
select {
case <-wc.kaLoopChan:
kaTicker.Stop()
case <-kaTicker.C:
keepAliveMsg, _ := json.Marshal(
KeepaliveMessage{
Metadata: MessageMetadata{
MessageID: util.RandomGUID(),
MessageType: "session_keepalive",
MessageTimestamp: time.Now().UTC().Format(time.RFC3339Nano),
},
Payload: KeepaliveMessagePayload{},
},
)
err := wc.SendMessage(websocket.TextMessage, keepAliveMsg)
if err != nil {
onCloseConnection(*wsServers[1], wc)
}

if debug {
log.Printf("[DEBUG] Write: %s", keepAliveMsg)
}
}
}
}()

// TODO: Read messages
for {
conn.SetReadDeadline(time.Now().Add(time.Second * MINIMUM_MESSAGE_FREQUENCY_SECONDS))
mt, message, err := conn.ReadMessage()
if err != nil {
log.Println("read:", err)
onCloseConnection(*wsSrv, conn)
onCloseConnection(wsSrv, wc)
break
}
if debug {
log.Printf("recv: [%d] %s", mt, message)
}
/*err = conn.WriteMessage(mt, message)
/*err = wc.SendMessage(mt, message)
if err != nil {
log.Println("write:", err)
onCloseConnection(websocketId)
Expand All @@ -134,7 +199,12 @@ func eventsubHandle(w http.ResponseWriter, r *http.Request) {
}
}

func onCloseConnection(wsSrv WebsocketServer, conn *websocket.Conn) {
func onCloseConnection(wsSrv WebsocketServer, wc *WebsocketConnection) {
// Close ping loop chan
wc.closed = true
close(wc.pingLoopChan)
close(wc.kaLoopChan)

// Remove from list
c := 0
for i := 0; i < len(wsSrv.connections); i++ {
Expand Down Expand Up @@ -202,7 +272,7 @@ func activateReconnectTest(server http.Server, ctx context.Context) {
},
)

err := c.Conn.WriteMessage(1, reconnectMsg) // Ignore err here because we're shutting down so it doesn't really matter
err := c.SendMessage(websocket.TextMessage, reconnectMsg)
if err != nil {
log.Printf("ERROR (clientId %v): %v", c.clientId, err)
} else {
Expand Down Expand Up @@ -230,7 +300,7 @@ func activateReconnectTest(server http.Server, ctx context.Context) {
}
}

func StartServer(port int, enableDebug bool, reconnectTestTimer int) {
func StartServer(port int, enableDebug bool, reconnectTestTimer int, sslEnabled bool) {
debug = enableDebug

m := http.NewServeMux()
Expand Down Expand Up @@ -259,16 +329,25 @@ func StartServer(port int, enableDebug bool, reconnectTestTimer int) {
ctx1 = context.WithValue(ctx1, "db", db)
ctx2 = context.WithValue(ctx2, "db", db)

wsServers[0] = &WebsocketServer{0, util.RandomGUID(), fmt.Sprintf("wss://localhost:%v/eventsub", port), []WebsocketConnection{}, false}
wsServers[1] = &WebsocketServer{1, util.RandomGUID(), fmt.Sprintf("wss://localhost:%v/eventsub", port+1), []WebsocketConnection{}, true}
wsSrv1Url := fmt.Sprintf("ws://localhost:%v/eventsub", port)
wsSrv2Url := fmt.Sprintf("ws://localhost:%v/eventsub", port+1)

// Change to WSS if SSL is enabled via flag
if sslEnabled {
wsSrv1Url = fmt.Sprintf("wss://localhost:%v/eventsub", port)
wsSrv2Url = fmt.Sprintf("wss://localhost:%v/eventsub", port+1)
}

wsServers[0] = &WebsocketServer{0, util.RandomGUID(), wsSrv1Url, []*WebsocketConnection{}, false}
wsServers[1] = &WebsocketServer{1, util.RandomGUID(), wsSrv2Url, []*WebsocketConnection{}, true}

RegisterHandlers(m)

stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)

s1 := StartIndividualServer(port, reconnectTestTimer, m, ctx1)
s2 := StartIndividualServer(port+1, 0, m, ctx2) // Start second server, at a port above. Never has a reconnect timer
s1 := StartIndividualServer(port, reconnectTestTimer, sslEnabled, m, ctx1)
s2 := StartIndividualServer(port+1, 0, sslEnabled, m, ctx2) // Start second server, at a port above. Never has a reconnect timer

<-stop // Wait for ctrl + c

Expand All @@ -286,7 +365,7 @@ func StartServer(port int, enableDebug bool, reconnectTestTimer int) {
}
}

func StartIndividualServer(port int, reconnectTestTimer int, m *http.ServeMux, ctx context.Context) http.Server {
func StartIndividualServer(port int, reconnectTestTimer int, sslEnabled bool, m *http.ServeMux, ctx context.Context) http.Server {
s := http.Server{
Addr: fmt.Sprintf(":%v", port),
Handler: m,
Expand All @@ -311,18 +390,26 @@ func StartIndividualServer(port int, reconnectTestTimer int, m *http.ServeMux, c
}
}()

home, _ := util.GetApplicationDir()
crtFile := filepath.Join(home, "localhost.crt")
keyFile := filepath.Join(home, "localhost.key")

if err := s.ListenAndServeTLS(crtFile, keyFile); err != nil {
if err != http.ErrServerClosed {
log.Fatalf(`%v
** You need to generate localhost.crt and localhost.key for this to work **
** Please run these commands (Note: you'll have a cert error in your web browser, but it'll still start): **
openssl genrsa -out "%v" 2048
openssl req -new -x509 -sha256 -key "%v" -out "%v" -days 3650`,
err, keyFile, keyFile, crtFile)
if sslEnabled { // Open HTTP server with HTTPS support
home, _ := util.GetApplicationDir()
crtFile := filepath.Join(home, "localhost.crt")
keyFile := filepath.Join(home, "localhost.key")

if err := s.ListenAndServeTLS(crtFile, keyFile); err != nil {
if err != http.ErrServerClosed {
log.Fatalf(`%v
** You need to generate localhost.crt and localhost.key for this to work **
** Please run these commands (Note: you'll have a cert error in your web browser, but it'll still start): **
openssl genrsa -out "%v" 2048
openssl req -new -x509 -sha256 -key "%v" -out "%v" -days 3650`,
err, keyFile, keyFile, crtFile)
}
}
} else { // Open HTTP server without HTTPS support
if err := s.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Fatalf("%v", err)
}
}
}
}()
Expand Down

0 comments on commit c4fe31a

Please sign in to comment.