Skip to content

Commit

Permalink
added websocket runner
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorvyadav1111 committed Dec 4, 2024
1 parent aa4dbf5 commit e727f2f
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 33 deletions.
16 changes: 8 additions & 8 deletions integration_tests/commands/tests/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func TestHttpCommands(t *testing.T) {
assert.Equal(t, test.Output[idx], output)
}
}
})
if len(test.Cleanup) > 0 {
// join all the keys to be cleaned up
keys := ""
for _, key := range test.Cleanup {
keys += key
if len(test.Cleanup) > 0 {
// join all the keys to be cleaned up
keys := ""
for _, key := range test.Cleanup {
keys += key
}
parsers.HttpCommandExecuter(exec, `DEL `+keys)
}
parsers.HttpCommandExecuter(exec, `DEL `+keys)
}
})

}
}
8 changes: 8 additions & 0 deletions integration_tests/commands/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func TestMain(m *testing.M) {
httpOpts := servers.TestServerOptions{
Port: 8083,
}
wsOpts := servers.TestServerOptions{
Port: 8380,
}

wg.Add(1)
go func() {
Expand All @@ -33,6 +36,11 @@ func TestMain(m *testing.M) {
}()

//TODO: RunWebSocketServer
wg.Add(1)
go func() {
defer wg.Done()
servers.RunWebsocketServer(ctx, &wg, wsOpts)
}()

// Wait for the server to start
time.Sleep(2 * time.Second)
Expand Down
13 changes: 13 additions & 0 deletions integration_tests/commands/tests/parsers/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package parsers

func ParseResponse(response interface{}) interface{} {
// convert the output to the int64 if it is float64
switch response.(type) {

Check failure on line 5 in integration_tests/commands/tests/parsers/common.go

View workflow job for this annotation

GitHub Actions / lint

typeSwitchVar: 1 case can benefit from type switch with assignment (gocritic)

Check failure on line 5 in integration_tests/commands/tests/parsers/common.go

View workflow job for this annotation

GitHub Actions / lint

S1034: assigning the result of this type assertion to a variable (switch response := response.(type)) could eliminate type assertions in switch cases (gosimple)
case float64:
return int64(response.(float64))

Check failure on line 7 in integration_tests/commands/tests/parsers/common.go

View workflow job for this annotation

GitHub Actions / lint

S1034(related information): could eliminate this type assertion (gosimple)
case nil:
return "(nil)"
default:
return response
}
}
12 changes: 0 additions & 12 deletions integration_tests/commands/tests/parsers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,6 @@ import (
"github.com/dicedb/dice/integration_tests/commands/tests/servers"
)

func ParseResponse(response interface{}) interface{} {
// convert the output to the int64 if it is float64
switch response.(type) {
case float64:
return int64(response.(float64))
case nil:
return "(nil)"
default:
return response
}
}

func HttpCommandExecuter(exec *servers.HTTPCommandExecutor, cmd string) (interface{}, error) {

Check failure on line 9 in integration_tests/commands/tests/parsers/http.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: func HttpCommandExecuter should be HTTPCommandExecuter (stylecheck)
// convert the command to a HTTPCommand
// cmd starts with Command and Body is values after that
Expand Down
40 changes: 40 additions & 0 deletions integration_tests/commands/tests/parsers/websocket.go
Original file line number Diff line number Diff line change
@@ -1 +1,41 @@
package parsers

import (
"encoding/json"
"fmt"

"github.com/gorilla/websocket"
)

func FireWSCommandAndReadResponse(conn *websocket.Conn, cmd string) (interface{}, error) {
err := FireWSCommand(conn, cmd)
if err != nil {
return nil, err
}

// read the response
_, resp, err := conn.ReadMessage()
if err != nil {
return nil, err
}

// marshal to json
var respJSON interface{}
if err = json.Unmarshal(resp, &respJSON); err != nil {
return nil, fmt.Errorf("error unmarshaling response")
}
fmt.Println("Response: ", respJSON)
respJSON = ParseResponse(respJSON)

return respJSON, nil
}

func FireWSCommand(conn *websocket.Conn, cmd string) error {
// send request
err := conn.WriteMessage(websocket.TextMessage, []byte(cmd))
if err != nil {
return err
}

return nil
}
8 changes: 4 additions & 4 deletions integration_tests/commands/tests/resp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestRespCommands(t *testing.T) {
output := parsers.RespCommandExecuter(conn, cmd)
assert.Equal(t, test.Output[idx], output)
}
for _, key := range test.Cleanup {
cmd := "DEL " + key
_ = parsers.RespCommandExecuter(conn, cmd)
}
})

for _, key := range test.Cleanup {
cmd := "DEL " + key
_ = parsers.RespCommandExecuter(conn, cmd)
}
}
}
6 changes: 6 additions & 0 deletions integration_tests/commands/tests/servers/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package servers

type TestServerOptions struct {
Port int
MaxClients int32
}
5 changes: 0 additions & 5 deletions integration_tests/commands/tests/servers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
dstore "github.com/dicedb/dice/internal/store"
)

type CommandExecutor interface {
FireCommand(cmd string) interface{}
Name() string
}

type HTTPCommandExecutor struct {
httpClient *http.Client
baseURL string
Expand Down
4 changes: 0 additions & 4 deletions integration_tests/commands/tests/servers/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ import (
"github.com/dicedb/dice/internal/watchmanager"
)

type TestServerOptions struct {
Port int
MaxClients int32
}

func GetRespConn() net.Conn {
conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", config.DiceConfig.RespServer.Port))
Expand Down
105 changes: 105 additions & 0 deletions integration_tests/commands/tests/servers/websocket.go
Original file line number Diff line number Diff line change
@@ -1 +1,106 @@
package servers

import (
"context"
"errors"
"log/slog"
"net/http"
"sync"
"time"

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/server"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
"github.com/gorilla/websocket"
)

const (
URL = "ws://localhost:8380"
testPort1 = 8380
testPort2 = 8381
)

type WebsocketCommandExecutor struct {
baseURL string
websocketClient *http.Client
upgrader websocket.Upgrader
}

func NewWebsocketCommandExecutor() *WebsocketCommandExecutor {
return &WebsocketCommandExecutor{
baseURL: URL,
websocketClient: &http.Client{
Timeout: time.Second * 100,
},
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
},
}
}

func (e *WebsocketCommandExecutor) ConnectToServer() *websocket.Conn {
// connect with Websocket Server
conn, resp, err := websocket.DefaultDialer.Dial(URL, nil)
if err != nil {
return nil
}
if resp != nil {
resp.Body.Close()
}
return conn
}

func (e *WebsocketCommandExecutor) FireCommand(conn *websocket.Conn, cmd string) error {
// send request
err := conn.WriteMessage(websocket.TextMessage, []byte(cmd))
if err != nil {
return err
}

return nil
}

func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOptions) {
config.DiceConfig.Network.IOBufferLength = 16
config.DiceConfig.Persistence.WriteAOFOnCleanup = false

// Initialize WebsocketServer
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel)
queryWatcherLocal := querymanager.NewQueryManager()
config.DiceConfig.WebSocket.Port = opt.Port
testServer := server.NewWebSocketServer(shardManager, testPort1, nil)
shardManagerCtx, cancelShardManager := context.WithCancel(ctx)

// run shard manager
wg.Add(1)
go func() {
defer wg.Done()
shardManager.Run(shardManagerCtx)
}()

// run query manager
wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// start websocket server
wg.Add(1)
go func() {
defer wg.Done()
srverr := testServer.Run(ctx)
if srverr != nil {
cancelShardManager()
if errors.Is(srverr, derrors.ErrAborted) {
return
}
slog.Debug("Websocket test server encountered an error: %v", slog.Any("error", srverr))
}
}()
}
69 changes: 69 additions & 0 deletions integration_tests/commands/tests/websocket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package tests

import (
"fmt"
"log"
"testing"
"time"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/integration_tests/commands/tests/parsers"
"github.com/dicedb/dice/integration_tests/commands/tests/servers"
"github.com/stretchr/testify/assert"
)

func init() {
parser := config.NewConfigParser()
if err := parser.ParseDefaults(config.DiceConfig); err != nil {
log.Fatalf("failed to load configuration: %v", err)
}
}

func TestWebsocketCommands(t *testing.T) {
exec := servers.NewWebsocketCommandExecutor()
allTests := GetAllTests()

conn := exec.ConnectToServer()
if conn == nil {
t.Fatal("Failed to connect to the server")
}

for _, test := range allTests {
t.Run(test.Name, func(t *testing.T) {
if !Validate(&test) {
t.Fatal("Test progression failed...")
}
// Setup commands
if len(test.Setup) > 0 {
for _, setup := range test.Setup {
for idx, cmd := range setup.Input {
output, _ := parsers.FireWSCommandAndReadResponse(conn, cmd)
assert.Equal(t, setup.Output[idx], output)
}
}
}
for idx, cmd := range test.Input {
if len(test.Delays) > 0 {
time.Sleep(test.Delays[idx])
}
output, _ := parsers.FireWSCommandAndReadResponse(conn, cmd)
fmt.Println(cmd, output, test.Output[idx])
if len(test.Assert) > 0 {
SwitchAsserts(t, test.Assert[idx], test.Output[idx], output)
} else {
assert.Equal(t, test.Output[idx], output)
}
}
if len(test.Cleanup) > 0 {
// join all the keys to be cleaned up
keys := ""
for _, key := range test.Cleanup {
keys += key
}
exec.FireCommand(conn, `DEL `+keys)
}
})

}

}

0 comments on commit e727f2f

Please sign in to comment.