From 2cbc4e12715dbfcaba08b723947b6ecea48872a9 Mon Sep 17 00:00:00 2001 From: Ehsan Noureddin Moosa Date: Thu, 28 Nov 2024 07:39:03 +0300 Subject: [PATCH] Update gnet to v2.6.2 and add WebSocket stability check Upgraded gnet dependency from v2.6.1 to v2.6.2 across relevant modules. Introduced a new test for WebSocket stability and implemented a timeout mechanism for WebSocket requests. Adjusted Echo service response handling to improve clarity. --- std/gateways/fastws/go.mod | 2 +- std/gateways/fastws/go.sum | 4 +-- stub/stub_ws.go | 17 ++++++++-- testenv/go.mod | 16 ++++----- testenv/services/echo.go | 2 +- testenv/stub_test.go | 68 +++++++++++++++++++++++++++++++++++++- 6 files changed, 94 insertions(+), 15 deletions(-) diff --git a/std/gateways/fastws/go.mod b/std/gateways/fastws/go.mod index 7dd25b2b..1ef7ca8f 100644 --- a/std/gateways/fastws/go.mod +++ b/std/gateways/fastws/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/clubpay/ronykit/kit v0.17.19 github.com/gobwas/ws v1.4.0 - github.com/panjf2000/gnet/v2 v2.6.1 + github.com/panjf2000/gnet/v2 v2.6.2 ) require ( diff --git a/std/gateways/fastws/go.sum b/std/gateways/fastws/go.sum index cd8fba5c..5bed540a 100644 --- a/std/gateways/fastws/go.sum +++ b/std/gateways/fastws/go.sum @@ -25,8 +25,8 @@ github.com/onsi/ginkgo/v2 v2.20.0 h1:PE84V2mHqoT1sglvHc8ZdQtPcwmvvt29WLEEO3xmdZw github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8= github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= -github.com/panjf2000/gnet/v2 v2.6.1 h1:IPkv4ZHwdPSqdKve6A4v0PsbieeK6gooIhFfk5GFBgo= -github.com/panjf2000/gnet/v2 v2.6.1/go.mod h1:HpNv+iQrIOeil1eyhdnKDlui7jivyMf0K3xwaeHKnh8= +github.com/panjf2000/gnet/v2 v2.6.2 h1:f6WOlfiaMtblK5RvuiXiAraDlawS0RvoI2LSE4ZaAWc= +github.com/panjf2000/gnet/v2 v2.6.2/go.mod h1:HpNv+iQrIOeil1eyhdnKDlui7jivyMf0K3xwaeHKnh8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/stub/stub_ws.go b/stub/stub_ws.go index a4648c9e..17cfc976 100644 --- a/stub/stub_ws.go +++ b/stub/stub_ws.go @@ -2,6 +2,7 @@ package stub import ( "context" + "errors" "fmt" "net" "strings" @@ -188,7 +189,7 @@ func (wCtx *WebsocketCtx) receiver(c *websocket.Conn) { continue } - // if this is a reply message we return it to the pending channel + // if this is a reply message, we return it to the pending channel wCtx.pendingMtx.Lock() ch, ok := wCtx.pending[rpcIn.GetID()] wCtx.pendingMtx.Unlock() @@ -322,6 +323,9 @@ type WebsocketRequest struct { // If this is nil, the response will be ignored. However, the response will be caught by // the default handler if it is set. Callback RPCMessageHandler + // Timeout if is set, then the callback will be called with ErrTimeout, in case of we didn't + // receive the response in time. + Timeout time.Duration } const ( @@ -369,7 +373,7 @@ func (wCtx *WebsocketCtx) Do(ctx context.Context, req WebsocketRequest) error { outC.Release() if req.Callback != nil { - go wCtx.waitForMessage(ctx, req.ID, req.ResMsg, req.Callback) + go wCtx.waitForMessage(ctx, req.ID, req.ResMsg, req.Callback, req.Timeout) } return nil @@ -377,18 +381,26 @@ func (wCtx *WebsocketCtx) Do(ctx context.Context, req WebsocketRequest) error { func (wCtx *WebsocketCtx) waitForMessage( ctx context.Context, id string, res kit.Message, cb RPCMessageHandler, + timeout time.Duration, ) { resCh := make(chan kit.IncomingRPCContainer, 1) wCtx.pendingMtx.Lock() wCtx.pending[id] = resCh wCtx.pendingMtx.Unlock() + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + select { case c := <-resCh: err := c.ExtractMessage(res) cb(ctx, res, c.GetHdrMap(), err) case <-ctx.Done(): + cb(ctx, res, nil, ErrTimeout) } wCtx.pendingMtx.Lock() @@ -412,4 +424,5 @@ func (c containerTraceCarrier) Set(key string, value string) { var ( ErrBadHandshake = websocket.ErrBadHandshake _ = ErrBadHandshake + ErrTimeout = errors.New("timeout") ) diff --git a/testenv/go.mod b/testenv/go.mod index 51c0f7eb..2d125271 100644 --- a/testenv/go.mod +++ b/testenv/go.mod @@ -3,15 +3,15 @@ module ronykit/testenv go 1.22 require ( - github.com/clubpay/ronykit/kit v0.17.10 - github.com/clubpay/ronykit/std/clusters/p2pcluster v0.17.10 - github.com/clubpay/ronykit/std/clusters/rediscluster v0.17.10 - github.com/clubpay/ronykit/std/gateways/fasthttp v0.17.10 - github.com/clubpay/ronykit/std/gateways/fastws v0.17.10 - github.com/clubpay/ronykit/stub v0.17.10 + github.com/clubpay/ronykit/kit v0.17.19 + github.com/clubpay/ronykit/std/clusters/p2pcluster v0.17.19 + github.com/clubpay/ronykit/std/clusters/rediscluster v0.17.19 + github.com/clubpay/ronykit/std/gateways/fasthttp v0.17.19 + github.com/clubpay/ronykit/std/gateways/fastws v0.17.19 + github.com/clubpay/ronykit/stub v0.17.19 github.com/orlangure/gnomock v0.31.0 - github.com/redis/go-redis/v9 v9.6.1 + github.com/redis/go-redis/v9 v9.7.0 github.com/smartystreets/goconvey v1.8.1 - go.uber.org/fx v1.22.2 + go.uber.org/fx v1.23.0 ) diff --git a/testenv/services/echo.go b/testenv/services/echo.go index de94213c..b00d16dd 100644 --- a/testenv/services/echo.go +++ b/testenv/services/echo.go @@ -48,7 +48,7 @@ var EchoService kit.ServiceBuilder = desc.NewService("EchoService"). func(ctx *kit.Context) { req, _ := ctx.In().GetMsg().(*EchoRequest) - ctx.Out(). + ctx.In().Reply(). SetMsg( &EchoResponse{ Embedded: req.Embedded, diff --git a/testenv/stub_test.go b/testenv/stub_test.go index ac29809d..3ca115a1 100644 --- a/testenv/stub_test.go +++ b/testenv/stub_test.go @@ -6,7 +6,9 @@ import ( "fmt" "math/rand" "net/http" + "sync" "testing" + "time" "ronykit/testenv/services" @@ -175,7 +177,8 @@ func stubWithAutoRun2(t *testing.T, opt fx.Option) func(c C) { func TestWebsocket(t *testing.T) { Convey("Websocket", t, func(c C) { testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){ - "Websocket Stub [Connect, Reconnect, Disconnect]": stubWebsocket, + //"Websocket Stub [Connect, Reconnect, Disconnect]": stubWebsocket, + "Stability Check": stubWebsocketStability, } for title, fn := range testCases { Convey(title+"FastHTTP", @@ -238,6 +241,69 @@ func stubWebsocket(t *testing.T, opt fx.Option) func(c C) { } } +func stubWebsocketStability(t *testing.T, opt fx.Option) func(c C) { + ctx := context.Background() + + return func(c C) { + Prepare( + t, c, + fx.Options( + opt, + ), + ) + + time.Sleep(1 * time.Second) + wsCtx := stub.New("127.0.0.1:8082", stub.WithLogger(&kitLogger{})). + Websocket( + stub.WithPredicateKey("cmd"), + stub.WithPingTime(time.Second*5), + ) + + err := wsCtx.Connect(ctx, "/agent/ws") + c.So(err, ShouldBeNil) + + wg := sync.WaitGroup{} + for range 200 { + X := utils.RandomID(10) + XP := utils.RandomID(10) + + // Set Key to instance 1 + resp := &services.EchoResponse{} + + wg.Add(1) + err = wsCtx.Do( + ctx, + stub.WebsocketRequest{ + Predicate: "echo", + MessageType: stub.WebsocketText, + ReqMsg: &services.EchoRequest{ + Embedded: services.Embedded{ + X: X, + XP: XP, + }, + Input: XP, + }, + ResMsg: resp, + ReqHdr: nil, + Callback: func(ctx context.Context, msg kit.Message, hdr stub.Header, err error) { + if err != nil { + c.So(err, ShouldEqual, stub.ErrTimeout) + } else { + c.So(resp.X, ShouldEqual, X) + c.So(resp.XP, ShouldEqual, XP) + } + + wg.Done() + }, + Timeout: time.Second, + }, + ) + c.So(err, ShouldBeNil) + wg.Wait() + } + } +} + func TestHttp(t *testing.T) { Convey("HTTP", t, func(c C) { testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){