Skip to content

Commit

Permalink
node name as id
Browse files Browse the repository at this point in the history
  • Loading branch information
sgalsaleh committed Nov 21, 2024
1 parent 45e7609 commit 00d51e6
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions pkg/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ type ConnectToECWebsocketResponse struct {
func (h *Handler) ConnectToECWebsocket(w http.ResponseWriter, r *http.Request) {
response := ConnectToECWebsocketResponse{}

clientID := r.URL.Query().Get("id")
if clientID == "" {
response.Error = "missing client id"
nodeName := r.URL.Query().Get("nodeName")
if nodeName == "" {
response.Error = "missing node name"
logger.Error(errors.New(response.Error))
JSON(w, http.StatusBadRequest, response)
return
Expand All @@ -41,98 +41,98 @@ func (h *Handler) ConnectToECWebsocket(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close()

conn.SetPingHandler(wsPingHandler(clientID, conn))
conn.SetPongHandler(wsPongHandler(clientID, conn))
conn.SetCloseHandler(wsCloseHandler(clientID, conn))
conn.SetPingHandler(wsPingHandler(nodeName, conn))
conn.SetPongHandler(wsPongHandler(nodeName, conn))
conn.SetCloseHandler(wsCloseHandler(nodeName, conn))

// register the client
registerWSClient(clientID, conn)
registerWSClient(nodeName, conn)

// ping client on a regular interval to make sure it's still connected
go pingWSClient(clientID, conn)
go pingWSClient(nodeName, conn)

// listen to client messages
listenToWSClient(clientID, conn)
listenToWSClient(nodeName, conn)
}

func pingWSClient(id string, conn *websocket.Conn) {
func pingWSClient(nodeName string, conn *websocket.Conn) {
for {
sleepDuration := time.Second * time.Duration(5+rand.Intn(16)) // 5-20 seconds
time.Sleep(sleepDuration)

pingMsg := fmt.Sprintf("%d", rand.Intn(1000))
logger.Infof("Sending ping message '%s' to client %s", pingMsg, id)
logger.Infof("Sending ping message '%s' to %s", pingMsg, nodeName)

if err := conn.WriteControl(websocket.PingMessage, []byte(pingMsg), time.Now().Add(1*time.Second)); err != nil {
if isWSConnClosed(id, err) {
if isWSConnClosed(nodeName, err) {
return
}
logger.Errorf("Failed to send ping message to client %s: %v", id, err)
logger.Errorf("Failed to send ping message to %s: %v", nodeName, err)
}
}
}

func listenToWSClient(id string, conn *websocket.Conn) {
func listenToWSClient(nodeName string, conn *websocket.Conn) {
for {
_, _, err := conn.ReadMessage() // this is required to receive ping/pong messages
if err != nil {
if isWSConnClosed(id, err) {
if isWSConnClosed(nodeName, err) {
return
}
logger.Errorf("Error reading websocket message from client %s: %v", id, err)
logger.Errorf("Error reading websocket message from %s: %v", nodeName, err)
}
}
}

func registerWSClient(id string, conn *websocket.Conn) {
func registerWSClient(nodeName string, conn *websocket.Conn) {
wsMutex.Lock()
defer wsMutex.Unlock()

if existingConn, ok := wsClients[id]; ok {
if existingConn, ok := wsClients[nodeName]; ok {
existingConn.Close()
delete(wsClients, id)
delete(wsClients, nodeName)
}
wsClients[id] = conn
wsClients[nodeName] = conn

logger.Infof("Registered new websocket client %s", id)
logger.Infof("Registered new websocket for %s", nodeName)
}

func wsPingHandler(id string, conn *websocket.Conn) func(message string) error {
func wsPingHandler(nodeName string, conn *websocket.Conn) func(message string) error {
return func(message string) error {
logger.Infof("Received ping message '%s' from client %s", message, id)
logger.Infof("Sending pong message '%s' to client %s", message, id)
logger.Infof("Received ping message '%s' from %s", message, nodeName)
logger.Infof("Sending pong message '%s' to %s", message, nodeName)
if err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(1*time.Second)); err != nil {
logger.Errorf("Failed to send pong message to client %s: %v", id, err)
logger.Errorf("Failed to send pong message to %s: %v", nodeName, err)
}
return nil
}
}

func wsPongHandler(id string, conn *websocket.Conn) func(message string) error {
func wsPongHandler(nodeName string, conn *websocket.Conn) func(message string) error {
return func(message string) error {
logger.Infof("Received pong message '%s' from client %s", message, id)
logger.Infof("Received pong message '%s' from %s", message, nodeName)
return nil
}
}

func wsCloseHandler(id string, conn *websocket.Conn) func(code int, text string) error {
func wsCloseHandler(nodeName string, conn *websocket.Conn) func(code int, text string) error {
return func(code int, text string) error {
logger.Errorf("Websocket connection closed for client %s: %d (exit code), message: %q", id, code, text)
logger.Errorf("Websocket connection closed for %s: %d (exit code), message: %q", nodeName, code, text)

wsMutex.Lock()
delete(wsClients, id)
delete(wsClients, nodeName)
wsMutex.Unlock()

closeMessage := websocket.FormatCloseMessage(code, text)
if err := conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(time.Second)); err != nil {
logger.Errorf("Failed to send close message to client %s: %v", id, err)
logger.Errorf("Failed to send close message to %s: %v", nodeName, err)
}
return nil
}
}

func isWSConnClosed(id string, err error) bool {
if _, ok := wsClients[id]; !ok {
func isWSConnClosed(nodeName string, err error) bool {
if _, ok := wsClients[nodeName]; !ok {
return true
}
if _, ok := err.(*websocket.CloseError); ok {
Expand Down

0 comments on commit 00d51e6

Please sign in to comment.