Skip to content

Commit

Permalink
WIP: websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
yzqzss committed Oct 25, 2024
1 parent 8515dc5 commit fabef96
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 80 deletions.
86 changes: 6 additions & 80 deletions altcrawlhq_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,117 +2,40 @@ package altcrawlhqserver

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"

"git.archive.org/wb/gocrawlhq"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func isAuthorized(c *gin.Context) bool {
authKey := c.GetHeader("X-Auth-Key")
authSecret := c.GetHeader("X-Auth-Secret")
identifier := c.GetHeader("X-Identifier")
// identifier := c.GetHeader("X-Identifier")

if authKey == "" || authSecret == "" {
return false
}

if identifier == "" {
return false
}

if authKey == "saveweb_key" && authSecret == "saveweb_sec" {
return true
}

return false
}

func websocketHandler(c *gin.Context) {
if !isAuthorized(c) {
c.JSON(http.StatusUnauthorized, gin.H{
"error": "Unauthorized",
})
return
}
upGrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
panic(err)
}

defer func() {
closeSocketErr := ws.Close()
if closeSocketErr != nil {
panic(err)
}
}()

for {
wsMsgType, wsMsg, err := ws.ReadMessage()
if err != nil {
panic(err)
}
fmt.Printf("Message Type: %d, Message: %s\n", wsMsgType, string(wsMsg))

if wsMsgType != websocket.TextMessage {
panic("Message type is not text")
}

// {"type":"identify","payload":`+string(marshalled)+`}`
msgType := struct {
Type string `json:"type"`
}{}
if err := json.Unmarshal(wsMsg, &msgType); err != nil {
panic(err)
}

if msgType.Type != "identify" {
panic("Message type is not identify")
}

identifyMessage := struct {
Payload gocrawlhq.IdentifyMessage `json:"payload"`
}{}
if err := json.Unmarshal(wsMsg, &identifyMessage); err != nil {
panic(err)
}

fmt.Printf("Identify Message: %+v\n", identifyMessage)

err = ws.WriteJSON(struct {
Reply string `json:"reply"`
}{
Reply: "Echo...",
})
if err != nil {
panic(err)
}
}
}

type FeedRequest struct {
Size int `json:"size"`
Strategy string `json:"strategy"`
}

var MONGODB_URI string = os.Getenv("MONGODB_URI")

var mongoClient *mongo.Client
var mongoDatabase *mongo.Database

func connect_to_mongodb() {
fmt.Println("Connecting to MongoDB...")
Expand All @@ -130,8 +53,10 @@ func connect_to_mongodb() {
if err != nil {
panic(err)
}
mongoClient = client
fmt.Println("Connected to MongoDB!")

db := client.Database("crawlhq")
mongoDatabase = db
}

func init() {
Expand Down Expand Up @@ -161,6 +86,7 @@ func ServeHTTP() {
projectGroup.POST("/discovered", discoveredHandler)
}
apiGroup.GET("/ws", websocketHandler)
apiGroup.GET("/online", onlineClientsHandler)
}
if err := g.Run(); err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
git.archive.org/wb/gocrawlhq v1.2.5
github.com/gin-gonic/gin v1.10.0
github.com/gorilla/websocket v1.5.3
github.com/jellydator/ttlcache/v3 v3.3.0
go.mongodb.org/mongo-driver v1.16.1
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
Expand Down Expand Up @@ -89,6 +91,8 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4l8=
go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k=
golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
111 changes: 111 additions & 0 deletions ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package altcrawlhqserver

import (
"encoding/json"
"fmt"
"net/http"
"time"

"git.archive.org/wb/gocrawlhq"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/jellydator/ttlcache/v3"
)

var onlineClientsStats = ttlcache.New[string, gocrawlhq.IdentifyMessage](
ttlcache.WithTTL[string, gocrawlhq.IdentifyMessage](time.Minute*1),
ttlcache.WithDisableTouchOnHit[string, gocrawlhq.IdentifyMessage](),
)

func init() {
fmt.Println("Initializing onlineClientsStats...")
go onlineClientsStats.Start()
fmt.Println("Initialized onlineClientsStats!")
}

func onlineClientsHandler(c *gin.Context) {
if !isAuthorized(c) {
c.JSON(http.StatusUnauthorized, gin.H{
"error": "Unauthorized",
})
return
}

clients := make([]gocrawlhq.IdentifyMessage, 0)
for _, client := range onlineClientsStats.Items() {
clients = append(clients, client.Value())
}
c.JSON(http.StatusOK, clients)
}

func websocketHandler(c *gin.Context) {
if !isAuthorized(c) {
c.JSON(http.StatusUnauthorized, gin.H{
"error": "Unauthorized",
})
return
}
upGrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
panic(err)
}

defer func() {
closeSocketErr := ws.Close()
if closeSocketErr != nil {
panic(err)
}
}()

for {
wsMsgType, wsMsg, err := ws.ReadMessage()
if err != nil {
panic(err)
}
fmt.Printf("Message Type: %d, Message: %s\n", wsMsgType, string(wsMsg))

if wsMsgType != websocket.TextMessage {
panic("Message type is not text")
}

// {"type":"identify","payload":`+string(marshalled)+`}`
msgType := struct {
Type string `json:"type"`
}{}
if err := json.Unmarshal(wsMsg, &msgType); err != nil {
panic(err)
}

if msgType.Type != "identify" {
panic("Message type is not identify")
}

identifyMessage := struct {
Payload gocrawlhq.IdentifyMessage `json:"payload"`
}{}
if err := json.Unmarshal(wsMsg, &identifyMessage); err != nil {
panic(err)
}

onlineClientsStats.Set(identifyMessage.Payload.Identifier, identifyMessage.Payload, ttlcache.DefaultTTL)

fmt.Printf("Identify Message: %+v\n", identifyMessage)

err = ws.WriteJSON(struct {
Reply string `json:"reply"`
}{
Reply: "Echo...",
})
if err != nil {
panic(err)
}
}
}

0 comments on commit fabef96

Please sign in to comment.