diff --git a/integration_tests/commands/http/command_getkeys_test.go b/integration_tests/commands/http/command_getkeys_test.go index 906117296..3ebb1074a 100644 --- a/integration_tests/commands/http/command_getkeys_test.go +++ b/integration_tests/commands/http/command_getkeys_test.go @@ -38,13 +38,14 @@ func TestCommandGetKeys(t *testing.T) { }, expected: []interface{}{[]interface{}{"1 2 3 4 5 6 7"}}, }, - { - name: "MSET command", - commands: []HTTPCommand{ - {Command: "COMMAND/GETKEYS", Body: map[string]interface{}{"key": "MSET", "keys": []interface{}{"key1 key2"}, "values": []interface{}{" val1 val2"}}}, - }, - expected: []interface{}{[]interface{}{"key1 key2"}}, - }, + // Skipping these tests until multishards cmds supported by http + //{ + // name: "MSET command", + // commands: []HTTPCommand{ + // {Command: "COMMAND/GETKEYS", Body: map[string]interface{}{"key": "MSET", "keys": []interface{}{"key1 key2"}, "values": []interface{}{" val1 val2"}}}, + // }, + // expected: []interface{}{"ERR invalid command specified"}, + //}, { name: "Expire command", commands: []HTTPCommand{ diff --git a/integration_tests/commands/http/command_rename_test.go b/integration_tests/commands/http/command_rename_test.go index caef9ed8c..0c9845060 100644 --- a/integration_tests/commands/http/command_rename_test.go +++ b/integration_tests/commands/http/command_rename_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/copy_test.go b/integration_tests/commands/http/copy_test.go index 51fdaa41a..fc46da0ce 100644 --- a/integration_tests/commands/http/copy_test.go +++ b/integration_tests/commands/http/copy_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/dbsize_test.go b/integration_tests/commands/http/dbsize_test.go index a76929d66..430abab5e 100644 --- a/integration_tests/commands/http/dbsize_test.go +++ b/integration_tests/commands/http/dbsize_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/json_test.go b/integration_tests/commands/http/json_test.go index 8a3b862ec..ebcee8c07 100644 --- a/integration_tests/commands/http/json_test.go +++ b/integration_tests/commands/http/json_test.go @@ -881,6 +881,7 @@ func TestJsonStrlen(t *testing.T) { } func TestJSONMGET(t *testing.T) { + t.Skip("Skipping this test until multishards cmds supported by http") exec := NewHTTPCommandExecutor() setupData := map[string]string{ "xx": `["hehhhe","hello"]`, diff --git a/integration_tests/commands/http/keys_test.go b/integration_tests/commands/http/keys_test.go index 75c9b05b1..6219a3945 100644 --- a/integration_tests/commands/http/keys_test.go +++ b/integration_tests/commands/http/keys_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/mget_test.go b/integration_tests/commands/http/mget_test.go index 93c97e0a7..ae85f68ff 100644 --- a/integration_tests/commands/http/mget_test.go +++ b/integration_tests/commands/http/mget_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/mset_test.go b/integration_tests/commands/http/mset_test.go index 8b8182e1f..7a0d47f87 100644 --- a/integration_tests/commands/http/mset_test.go +++ b/integration_tests/commands/http/mset_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/object_test.go b/integration_tests/commands/http/object_test.go index e16cdab27..b0b105883 100644 --- a/integration_tests/commands/http/object_test.go +++ b/integration_tests/commands/http/object_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/http/set_data_cmd_test.go b/integration_tests/commands/http/set_data_cmd_test.go index e850a7569..adbd66bce 100644 --- a/integration_tests/commands/http/set_data_cmd_test.go +++ b/integration_tests/commands/http/set_data_cmd_test.go @@ -159,120 +159,121 @@ func TestSetDataCmd(t *testing.T) { assert_type: []string{"equal", "array", "equal", "array"}, expected: []interface{}{float64(3), []any{string("bar"), string("baz"), string("bax")}, float64(0), []any{string("bar"), string("baz"), string("bax")}}, }, - { - name: "SADD & SDIFF", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "baz"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "bax"}}, - {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal", "equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), float64(1), float64(1), []any{string("bar")}}, - }, - { - name: "SADD & SDIFF with non-existing subsequent key", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), []any{string("bar"), string("baz")}}, - }, - { - name: "SADD & SDIFF with wrong key type", - commands: []HTTPCommand{ - {Command: "SET", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal"}, - expected: []interface{}{"OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, - }, - { - name: "SADD & SDIFF with subsequent key of wrong type", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SET", Body: map[string]interface{}{"key": "foo2", "value": "bar"}}, - {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal", "equal", "equal"}, - expected: []interface{}{float64(1), float64(1), "OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, - }, - { - name: "SADD & SDIFF with non-existing first key", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SDIFF", Body: map[string]interface{}{"key1": "foo2", "key2": "foo"}}, - }, - assert_type: []string{"equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), []any{}}, - }, - { - name: "SADD & SDIFF with one key", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SDIFF", Body: map[string]interface{}{"key": "foo"}}, - }, - assert_type: []string{"equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), []any{string("bar"), string("baz")}}, - }, - { - name: "SADD & SINTER", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "baz"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "bax"}}, - {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal", "equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), float64(1), float64(1), []any{string("baz")}}, - }, - { - name: "SADD & SINTER with non-existing subsequent key", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), []any{}}, - }, - { - name: "SADD & SINTER with wrong key type", - commands: []HTTPCommand{ - {Command: "SET", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal"}, - expected: []interface{}{"OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, - }, - { - name: "SADD & SINTER with subsequent key of wrong type", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SET", Body: map[string]interface{}{"key": "foo2", "value": "bar"}}, - {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, - }, - assert_type: []string{"equal", "equal", "equal", "equal"}, - expected: []interface{}{float64(1), float64(1), "OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, - }, - { - name: "SADD & SINTER with single key", - commands: []HTTPCommand{ - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, - {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, - {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo"}}}, - }, - assert_type: []string{"equal", "equal", "array"}, - expected: []interface{}{float64(1), float64(1), []any{string("bar"), string("baz")}}, - }, + // Skipping these tests until multishards cmds supported by http + //{ + // name: "SADD & SDIFF", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "baz"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "bax"}}, + // {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal", "equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), float64(1), float64(1), []any{string("bar")}}, + //}, + //{ + // name: "SADD & SDIFF with non-existing subsequent key", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), []any{string("bar"), string("baz")}}, + //}, + //{ + // name: "SADD & SDIFF with wrong key type", + // commands: []HTTPCommand{ + // {Command: "SET", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal"}, + // expected: []interface{}{"OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, + //}, + //{ + // name: "SADD & SDIFF with subsequent key of wrong type", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SET", Body: map[string]interface{}{"key": "foo2", "value": "bar"}}, + // {Command: "SDIFF", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal", "equal", "equal"}, + // expected: []interface{}{float64(1), float64(1), "OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, + //}, + //{ + // name: "SADD & SDIFF with non-existing first key", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SDIFF", Body: map[string]interface{}{"key1": "foo2", "key2": "foo"}}, + // }, + // assert_type: []string{"equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), []any{}}, + //}, + //{ + // name: "SADD & SDIFF with one key", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SDIFF", Body: map[string]interface{}{"key": "foo"}}, + // }, + // assert_type: []string{"equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), []any{string("bar"), string("baz")}}, + //}, + //{ + // name: "SADD & SINTER", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "baz"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo2", "value": "bax"}}, + // {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal", "equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), float64(1), float64(1), []any{string("baz")}}, + //}, + //{ + // name: "SADD & SINTER with non-existing subsequent key", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), []any{}}, + //}, + //{ + // name: "SADD & SINTER with wrong key type", + // commands: []HTTPCommand{ + // {Command: "SET", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal"}, + // expected: []interface{}{"OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, + //}, + //{ + // name: "SADD & SINTER with subsequent key of wrong type", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SET", Body: map[string]interface{}{"key": "foo2", "value": "bar"}}, + // {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo", "foo2"}}}, + // }, + // assert_type: []string{"equal", "equal", "equal", "equal"}, + // expected: []interface{}{float64(1), float64(1), "OK", "WRONGTYPE Operation against a key holding the wrong kind of value"}, + //}, + //{ + // name: "SADD & SINTER with single key", + // commands: []HTTPCommand{ + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "bar"}}, + // {Command: "SADD", Body: map[string]interface{}{"key": "foo", "value": "baz"}}, + // {Command: "SINTER", Body: map[string]interface{}{"values": []interface{}{"foo"}}}, + // }, + // assert_type: []string{"equal", "equal", "array"}, + // expected: []interface{}{float64(1), float64(1), []any{string("bar"), string("baz")}}, + //}, } defer exec.FireCommand(HTTPCommand{ diff --git a/integration_tests/commands/http/setup.go b/integration_tests/commands/http/setup.go index d27c67fbd..24ef1aa00 100644 --- a/integration_tests/commands/http/setup.go +++ b/integration_tests/commands/http/setup.go @@ -12,12 +12,11 @@ import ( "sync" "time" - "github.com/dicedb/dice/internal/server/utils" + "github.com/dicedb/dice/internal/server/httpws" "github.com/dicedb/dice/config" derrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/querymanager" - "github.com/dicedb/dice/internal/server" "github.com/dicedb/dice/internal/shard" dstore "github.com/dicedb/dice/internal/store" ) @@ -88,7 +87,7 @@ func (e *HTTPCommandExecutor) FireCommand(cmd HTTPCommand) (interface{}, error) defer resp.Body.Close() if cmd.Command != "Q.WATCH" { - var result utils.HTTPResponse + var result httpws.HTTPResponse err = json.NewDecoder(resp.Body).Decode(&result) if err != nil { return nil, err @@ -119,7 +118,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption queryWatcherLocal := querymanager.NewQueryManager() config.DiceConfig.HTTP.Port = opt.Port // Initialize the HTTPServer - testServer := server.NewHTTPServer(shardManager, nil) + testServer := httpws.NewHTTPServer(shardManager, nil) // Inform the user that the server is starting fmt.Println("Starting the test server on port", config.DiceConfig.HTTP.Port) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) diff --git a/integration_tests/commands/http/touch_test.go b/integration_tests/commands/http/touch_test.go index 88400e51c..454a24821 100644 --- a/integration_tests/commands/http/touch_test.go +++ b/integration_tests/commands/http/touch_test.go @@ -1,3 +1,7 @@ +//go:build ignore +// +build ignore + +// Ignored as multishard commands not supported by HTTP package http import ( diff --git a/integration_tests/commands/resp/command_getkeys_test.go b/integration_tests/commands/resp/command_getkeys_test.go index 607bba994..166079b4b 100644 --- a/integration_tests/commands/resp/command_getkeys_test.go +++ b/integration_tests/commands/resp/command_getkeys_test.go @@ -15,7 +15,8 @@ var getKeysTestCases = []struct { {"Get command", "get key", []interface{}{"key"}}, {"TTL command", "ttl key", []interface{}{"key"}}, {"Del command", "del 1 2 3 4 5 6", []interface{}{"1", "2", "3", "4", "5", "6"}}, - {"MSET command", "MSET key1 val1 key2 val2", []interface{}{"key1", "key2"}}, + // TODO: Fix this for multi shard support + //{"MSET command", "MSET key1 val1 key2 val2", []interface{}{"key1", "key2"}}, {"Expire command", "expire key time extra", []interface{}{"key"}}, {"Ping command", "ping", "ERR the command has no key arguments"}, {"Invalid Get command", "get", "ERR invalid number of arguments specified for command"}, diff --git a/integration_tests/commands/websocket/setup.go b/integration_tests/commands/websocket/setup.go index cbfcc286a..281836f30 100644 --- a/integration_tests/commands/websocket/setup.go +++ b/integration_tests/commands/websocket/setup.go @@ -11,10 +11,11 @@ import ( "sync" "time" + "github.com/dicedb/dice/internal/server/httpws" + "github.com/dicedb/dice/config" derrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/querymanager" - "github.com/dicedb/dice/internal/server" "github.com/dicedb/dice/internal/shard" dstore "github.com/dicedb/dice/internal/store" "github.com/gorilla/websocket" @@ -117,7 +118,7 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel) queryWatcherLocal := querymanager.NewQueryManager() config.DiceConfig.WebSocket.Port = opt.Port - testServer := server.NewWebSocketServer(shardManager, testPort1, nil) + testServer := httpws.NewWebSocketServer(shardManager, testPort1, nil) shardManagerCtx, cancelShardManager := context.WithCancel(ctx) // run shard manager diff --git a/integration_tests/commands/websocket/writeretry_test.go b/integration_tests/commands/websocket/writeretry_test.go index 19555ed10..978b836af 100644 --- a/integration_tests/commands/websocket/writeretry_test.go +++ b/integration_tests/commands/websocket/writeretry_test.go @@ -2,6 +2,7 @@ package websocket import ( "fmt" + "github.com/dicedb/dice/internal/server/httpws" "net" "net/http" "net/url" @@ -9,7 +10,6 @@ import ( "testing" "time" - "github.com/dicedb/dice/internal/server" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" ) @@ -22,7 +22,7 @@ func TestWriteResponseWithRetries_Success(t *testing.T) { defer conn.Close() // Complete a write without any errors - err := server.WriteResponseWithRetries(conn, []byte("hello"), 3) + err := httpws.WriteResponseWithRetries(conn, []byte("hello"), 3) assert.NoError(t, err) } @@ -33,7 +33,7 @@ func TestWriteResponseWithRetries_NetworkError(t *testing.T) { // Simulate a network error by closing the connection beforehand conn.Close() - err := server.WriteResponseWithRetries(conn, []byte("hello"), 3) + err := httpws.WriteResponseWithRetries(conn, []byte("hello"), 3) assert.Error(t, err) assert.Contains(t, err.Error(), "network operation error") } @@ -45,7 +45,7 @@ func TestWriteResponseWithRetries_BrokenPipe(t *testing.T) { // Simulate a broken pipe error by manually triggering it. conn.UnderlyingConn().(*net.TCPConn).CloseWrite() - err := server.WriteResponseWithRetries(conn, []byte("hello"), 3) + err := httpws.WriteResponseWithRetries(conn, []byte("hello"), 3) assert.Error(t, err) assert.Contains(t, err.Error(), "broken pipe") } @@ -60,7 +60,7 @@ func TestWriteResponseWithRetries_EAGAINRetry(t *testing.T) { conn.SetWriteDeadline(time.Now().Add(1 * time.Millisecond)) for retries < 2 { - err := server.WriteResponseWithRetries(conn, []byte("hello"), 3) + err := httpws.WriteResponseWithRetries(conn, []byte("hello"), 3) if err != nil { // Retry and reset deadline after a failed attempt. conn.SetWriteDeadline(time.Now().Add(100 * time.Millisecond)) diff --git a/internal/eval/commands.go b/internal/eval/commands.go index 787bca20c..8b2d94bb7 100644 --- a/internal/eval/commands.go +++ b/internal/eval/commands.go @@ -111,55 +111,6 @@ var ( // their implementation for HTTP and WebSocket protocols is still pending. // As a result, their Eval functions remained intact. var ( - msetCmdMeta = DiceCmdMeta{ - Name: "MSET", - Info: `MSET sets multiple keys to multiple values in the db - args should contain an even number of elements - each pair of elements will be treated as pair - Returns encoded error response if the number of arguments is not even - Returns encoded OK RESP once all entries are added`, - Eval: evalMSET, - Arity: -3, - KeySpecs: KeySpecs{BeginIndex: 1, Step: 2, LastKey: -1}, - } - - jsonMGetCmdMeta = DiceCmdMeta{ - Name: "JSON.MGET", - Info: `JSON.MGET key..key [path] - Returns the encoded RESP value of the key, if present - Null reply: If the key doesn't exist or has expired. - Error reply: If the number of arguments is incorrect or the stored value is not a JSON type.`, - Eval: evalJSONMGET, - Arity: 2, - KeySpecs: KeySpecs{BeginIndex: 1}, - } - - keysCmdMeta = DiceCmdMeta{ - Name: "KEYS", - Info: "KEYS command is used to get all the keys in the database. Complexity is O(n) where n is the number of keys in the database.", - Eval: evalKeys, - Arity: 1, - } - - MGetCmdMeta = DiceCmdMeta{ - Name: "MGET", - Info: `The MGET command returns an array of RESP values corresponding to the provided keys. - For each key, if the key is expired or does not exist, the response will be RespNIL; - otherwise, the response will be the RESP value of the key. - `, - Eval: evalMGET, - Arity: -2, - KeySpecs: KeySpecs{BeginIndex: 1, Step: 1, LastKey: -1}, - } - - //TODO: supports only http protocol, needs to be removed once http is migrated to multishard - copyCmdMeta = DiceCmdMeta{ - Name: "COPY", - Info: `COPY command copies the value stored at the source key to the destination key.`, - Eval: evalCOPY, - Arity: -2, - } - //TODO: supports only http protocol, needs to be removed once http is migrated to multishard objectCopyCmdMeta = DiceCmdMeta{ Name: "OBJECTCOPY", @@ -168,39 +119,6 @@ var ( IsMigrated: true, Arity: -2, } - touchCmdMeta = DiceCmdMeta{ - Name: "TOUCH", - Info: `TOUCH key1 key2 ... key_N - Alters the last access time of a key(s). - A key is ignored if it does not exist.`, - Eval: evalTOUCH, - Arity: -2, - KeySpecs: KeySpecs{BeginIndex: 1}, - } - sdiffCmdMeta = DiceCmdMeta{ - Name: "SDIFF", - Info: `SDIFF key1 [key2 ... key_N] - Returns the members of the set resulting from the difference between the first set and all the successive sets. - Non existing keys are treated as empty sets.`, - Eval: evalSDIFF, - Arity: -2, - KeySpecs: KeySpecs{BeginIndex: 1}, - } - sinterCmdMeta = DiceCmdMeta{ - Name: "SINTER", - Info: `SINTER key1 [key2 ... key_N] - Returns the members of the set resulting from the intersection of all the given sets. - Non existing keys are treated as empty sets.`, - Eval: evalSINTER, - Arity: -2, - KeySpecs: KeySpecs{BeginIndex: 1}, - } - dbSizeCmdMeta = DiceCmdMeta{ - Name: "DBSIZE", - Info: `DBSIZE Return the number of keys in the database`, - Eval: evalDBSIZE, - Arity: 1, - } ) // Single Shard command @@ -716,12 +634,6 @@ var ( IsMigrated: true, NewEval: evalEXISTS, } - renameCmdMeta = DiceCmdMeta{ - Name: "RENAME", - Info: "Renames a key and overwrites the destination", - Eval: evalRename, - Arity: 3, - } getexCmdMeta = DiceCmdMeta{ Name: "GETEX", Info: `Get the value of key and optionally set its expiration. @@ -1429,9 +1341,7 @@ func init() { DiceCmds["COMMAND|INFO"] = commandInfoCmdMeta DiceCmds["COMMAND|DOCS"] = commandDocsCmdMeta DiceCmds["COMMAND|GETKEYSANDFLAGS"] = commandGetKeysAndFlagsCmdMeta - DiceCmds["COPY"] = copyCmdMeta DiceCmds["OBJECTCOPY"] = objectCopyCmdMeta - DiceCmds["DBSIZE"] = dbSizeCmdMeta DiceCmds["DECR"] = decrCmdMeta DiceCmds["DECRBY"] = decrByCmdMeta DiceCmds["DEL"] = delCmdMeta @@ -1481,7 +1391,6 @@ func init() { DiceCmds["JSON.FORGET"] = jsonforgetCmdMeta DiceCmds["JSON.GET"] = jsongetCmdMeta DiceCmds["JSON.INGEST"] = jsoningestCmdMeta - DiceCmds["JSON.MGET"] = jsonMGetCmdMeta DiceCmds["JSON.NUMINCRBY"] = jsonnumincrbyCmdMeta DiceCmds["JSON.NUMMULTBY"] = jsonnummultbyCmdMeta DiceCmds["JSON.OBJKEYS"] = jsonobjkeysCmdMeta @@ -1491,13 +1400,10 @@ func init() { DiceCmds["JSON.STRLEN"] = jsonStrlenCmdMeta DiceCmds["JSON.TOGGLE"] = jsontoggleCmdMeta DiceCmds["JSON.TYPE"] = jsontypeCmdMeta - DiceCmds["KEYS"] = keysCmdMeta DiceCmds["LATENCY"] = latencyCmdMeta DiceCmds["LLEN"] = llenCmdMeta DiceCmds["LPOP"] = lpopCmdMeta DiceCmds["LPUSH"] = lpushCmdMeta - DiceCmds["MGET"] = MGetCmdMeta - DiceCmds["MSET"] = msetCmdMeta DiceCmds["OBJECT"] = objectCmdMeta DiceCmds["PERSIST"] = persistCmdMeta DiceCmds["PFADD"] = pfAddCmdMeta @@ -1505,21 +1411,17 @@ func init() { DiceCmds["PFMERGE"] = pfMergeCmdMeta DiceCmds["PING"] = pingCmdMeta DiceCmds["PTTL"] = pttlCmdMeta - DiceCmds["RENAME"] = renameCmdMeta DiceCmds["RESTORE"] = restorekeyCmdMeta DiceCmds["RPOP"] = rpopCmdMeta DiceCmds["RPUSH"] = rpushCmdMeta DiceCmds["SADD"] = saddCmdMeta DiceCmds["SCARD"] = scardCmdMeta - DiceCmds["SDIFF"] = sdiffCmdMeta DiceCmds["SET"] = setCmdMeta DiceCmds["SETBIT"] = setBitCmdMeta DiceCmds["SETEX"] = setexCmdMeta - DiceCmds["SINTER"] = sinterCmdMeta DiceCmds["SLEEP"] = sleepCmdMeta DiceCmds["SMEMBERS"] = smembersCmdMeta DiceCmds["SREM"] = sremCmdMeta - DiceCmds["TOUCH"] = touchCmdMeta DiceCmds["TTL"] = ttlCmdMeta DiceCmds["TYPE"] = typeCmdMeta DiceCmds["ZADD"] = zaddCmdMeta diff --git a/internal/eval/eval.go b/internal/eval/eval.go index 0b28e079d..c571bd6a0 100644 --- a/internal/eval/eval.go +++ b/internal/eval/eval.go @@ -1,26 +1,20 @@ package eval import ( - "errors" "fmt" - "sort" "strconv" - "strings" "time" "github.com/dicedb/dice/internal/object" "github.com/dicedb/dice/internal/sql" - "github.com/bytedance/sonic" "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/clientio" "github.com/dicedb/dice/internal/comm" diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/querymanager" - "github.com/dicedb/dice/internal/server/utils" dstore "github.com/dicedb/dice/internal/store" - "github.com/ohler55/ojg/jp" ) type exDurationState int @@ -146,213 +140,6 @@ func EvalAUTH(args []string, c *comm.Client) []byte { return clientio.RespOK } -// evalMSET puts multiple pairs in db as in the args -// MSET is atomic, so all given keys are set at once. -// args must contain key and value pairs. - -// Returns encoded error response if at least a pair is not part of args -// Returns encoded OK RESP once new entries are added -// If the key already exists then the value will be overwritten and expiry will be discarded -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalMSET(args []string, store *dstore.Store) []byte { - if len(args) <= 1 || len(args)%2 != 0 { - return diceerrors.NewErrArity("MSET") - } - - // MSET does not have expiry support - var exDurationMs int64 = -1 - - insertMap := make(map[string]*object.Obj, len(args)/2) - for i := 0; i < len(args); i += 2 { - key, value := args[i], args[i+1] - storedValue, oType := getRawStringOrInt(value) - insertMap[key] = store.NewObj(storedValue, exDurationMs, oType) - } - - store.PutAll(insertMap) - return clientio.RespOK -} - -// evalDBSIZE returns the number of keys in the database. -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalDBSIZE(args []string, store *dstore.Store) []byte { - if len(args) > 0 { - return diceerrors.NewErrArity("DBSIZE") - } - - // Expired keys must be explicitly deleted since the cronFrequency for cleanup is configurable. - // A longer delay may prevent timely cleanup, leading to incorrect DBSIZE results. - dstore.DeleteExpiredKeys(store) - // return the RESP encoded value - return clientio.Encode(store.GetDBSize(), false) -} - -// trimElementAndUpdateArray trim the array between the given start and stop index -// Returns trimmed array -func trimElementAndUpdateArray(arr []any, start, stop int) []any { - updatedArray := make([]any, 0) - length := len(arr) - if len(arr) == 0 { - return updatedArray - } - var startIdx, stopIdx int - - if start >= length { - return updatedArray - } - - startIdx = adjustIndex(start, arr) - stopIdx = adjustIndex(stop, arr) - - if startIdx > stopIdx { - return updatedArray - } - - updatedArray = arr[startIdx : stopIdx+1] - return updatedArray -} - -// insertElementAndUpdateArray add an element at the given index -// Returns remaining array and error -func insertElementAndUpdateArray(arr []any, index int, elements []interface{}) (updatedArray []any, err error) { - length := len(arr) - var idx int - if index >= -length && index <= length { - idx = adjustIndex(index, arr) - } else { - return nil, errors.New("index out of bounds") - } - before := arr[:idx] - after := arr[idx:] - - elements = append(elements, after...) - before = append(before, elements...) - updatedArray = append(updatedArray, before...) - return updatedArray, nil -} - -// adjustIndex will bound the array between 0 and len(arr) - 1 -// It also handles negative indexes -func adjustIndex(idx int, arr []any) int { - // if index is positive and out of bound, limit it to the last index - if idx > len(arr) { - idx = len(arr) - 1 - } - - // if index is negative, change it to equivalent positive index - if idx < 0 { - // if index is out of bound then limit it to the first index - if idx < -len(arr) { - idx = 0 - } else { - idx = len(arr) + idx - } - } - return idx -} - -// evalJSONMGET retrieves a JSON value stored for the multiple key -// args must contain at least the key and a path; -// Returns encoded error response if incorrect number of arguments -// The RESP value of the key is encoded and then returned -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalJSONMGET(args []string, store *dstore.Store) []byte { - if len(args) < 2 { - return diceerrors.NewErrArity("JSON.MGET") - } - - var results []interface{} - - // Default path is root if not specified - argsLen := len(args) - path := args[argsLen-1] - - for i := 0; i < (argsLen - 1); i++ { - key := args[i] - result, _ := jsonMGETHelper(store, path, key) - results = append(results, result) - } - - var interfaceObj interface{} = results - return clientio.Encode(interfaceObj, false) -} - -func jsonMGETHelper(store *dstore.Store, path, key string) (result interface{}, err2 []byte) { - // Retrieve the object from the database - obj := store.Get(key) - if obj == nil { - return result, nil - } - - // Check if the object is of JSON type - if errWithMessage := object.AssertType(obj.Type, object.ObjTypeJSON); errWithMessage != nil { - return result, errWithMessage - } - - jsonData := obj.Value - - // If path is root, return the entire JSON - if path == defaultRootPath { - resultBytes, err := sonic.Marshal(jsonData) - if err != nil { - return result, diceerrors.NewErrWithMessage("could not serialize result") - } - return string(resultBytes), nil - } - - // Parse the JSONPath expression - expr, err := jp.ParseString(path) - if err != nil { - return result, diceerrors.NewErrWithMessage("invalid JSONPath") - } - - // Execute the JSONPath query - results := expr.Get(jsonData) - if len(results) == 0 { - return result, diceerrors.NewErrWithMessage(fmt.Sprintf("Path '%s' does not exist", path)) - } - - // Serialize the result - var resultBytes []byte - if len(results) == 1 { - resultBytes, err = sonic.Marshal(results[0]) - } else { - resultBytes, err = sonic.Marshal(results) - } - if err != nil { - return nil, diceerrors.NewErrWithMessage("could not serialize result") - } - return string(resultBytes), nil -} - -// ReverseSlice takes a slice of any type and returns a new slice with the elements reversed. -func ReverseSlice[T any](slice []T) []T { - reversed := make([]T, len(slice)) - for i, v := range slice { - reversed[len(slice)-1-i] = v - } - return reversed -} - -// Parses and returns the input string as an int64 or float64 -func parseFloatInt(input string) (result interface{}, err error) { - // Try to parse as an integer - if intValue, parseErr := strconv.ParseInt(input, 10, 64); parseErr == nil { - result = intValue - return - } - - // Try to parse as a float - if floatValue, parseErr := strconv.ParseFloat(input, 64); parseErr == nil { - result = floatValue - return - } - - // If neither parsing succeeds, return an error - err = errors.New("invalid input: not a valid int or float") - return -} - func evalHELLO(args []string, store *dstore.Store) []byte { if len(args) > 1 { return diceerrors.NewErrArity("HELLO") @@ -473,316 +260,3 @@ func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte { return clientio.RespOK } - -// evalKeys returns the list of keys that match the pattern should be the only param in args -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalKeys(args []string, store *dstore.Store) []byte { - if len(args) != 1 { - return diceerrors.NewErrArity("KEYS") - } - - pattern := args[0] - keys, err := store.Keys(pattern) - if err != nil { - return clientio.Encode(err, false) - } - - return clientio.Encode(keys, false) -} - -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalRename(args []string, store *dstore.Store) []byte { - if len(args) != 2 { - return diceerrors.NewErrArity("RENAME") - } - sourceKey := args[0] - destKey := args[1] - - // if Source key does not exist, return RESP encoded nil - sourceObj := store.Get(sourceKey) - if sourceObj == nil { - return diceerrors.NewErrWithMessage(diceerrors.NoKeyErr) - } - - // if Source and Destination Keys are same return RESP encoded ok - if sourceKey == destKey { - return clientio.RespOK - } - - if ok := store.Rename(sourceKey, destKey); ok { - return clientio.RespOK - } - return clientio.RespNIL -} - -// The MGET command returns an array of RESP values corresponding to the provided keys. -// For each key, if the key is expired or does not exist, the response will be response.RespNIL; -// otherwise, the response will be the RESP value of the key. -// MGET is atomic, it retrieves all values at once -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalMGET(args []string, store *dstore.Store) []byte { - if len(args) < 1 { - return diceerrors.NewErrArity("MGET") - } - values := store.GetAll(args) - resp := make([]interface{}, len(args)) - for i, obj := range values { - if obj == nil { - resp[i] = clientio.RespNIL - } else { - resp[i] = obj.Value - } - } - return clientio.Encode(resp, false) -} - -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalCOPY(args []string, store *dstore.Store) []byte { - if len(args) < 2 { - return diceerrors.NewErrArity("COPY") - } - - isReplace := false - - sourceKey := args[0] - destinationKey := args[1] - sourceObj := store.Get(sourceKey) - if sourceObj == nil { - return clientio.RespZero - } - - for i := 2; i < len(args); i++ { - arg := strings.ToUpper(args[i]) - if arg == dstore.Replace { - isReplace = true - } - } - - if isReplace { - store.Del(destinationKey) - } - - destinationObj := store.Get(destinationKey) - if destinationObj != nil { - return clientio.RespZero - } - - copyObj := sourceObj.DeepCopy() - if copyObj == nil { - return clientio.RespZero - } - - exp, ok := dstore.GetExpiry(sourceObj, store) - var exDurationMs int64 = -1 - if ok { - exDurationMs = int64(exp - uint64(utils.GetCurrentTime().UnixMilli())) - } - - store.Put(destinationKey, copyObj) - - if exDurationMs > 0 { - store.SetExpiry(copyObj, exDurationMs) - } - return clientio.RespOne -} - -// TODO: Needs to be removed after http and websocket migrated to the multithreading -func evalTOUCH(args []string, store *dstore.Store) []byte { - if len(args) == 0 { - return diceerrors.NewErrArity("TOUCH") - } - - count := 0 - for _, key := range args { - if store.Get(key) != nil { - count++ - } - } - - return clientio.Encode(count, false) -} - -func evalSDIFF(args []string, store *dstore.Store) []byte { - if len(args) < 1 { - return diceerrors.NewErrArity("SDIFF") - } - - srcKey := args[0] - obj := store.Get(srcKey) - - // if the source key does not exist, return an empty response - if obj == nil { - return clientio.Encode([]string{}, false) - } - - if err := object.AssertType(obj.Type, object.ObjTypeSet); err != nil { - return diceerrors.NewErrWithFormattedMessage(diceerrors.WrongTypeErr) - } - - // Get the set object from the store. - // store the count as the number of elements in the first set - srcSet := obj.Value.(map[string]struct{}) - count := len(srcSet) - - tmpSet := make(map[string]struct{}, count) - for k := range srcSet { - tmpSet[k] = struct{}{} - } - - // we decrement the count as we find the elements in the other sets - // if the count is 0, we skip further sets but still get them from - // the store to check if they are set objects and update their last accessed time - - for _, arg := range args[1:] { - // Get the set object from the store. - obj := store.Get(arg) - - if obj == nil { - continue - } - - // If the object exists, check if it is a set object. - if err := object.AssertType(obj.Type, object.ObjTypeSet); err != nil { - return diceerrors.NewErrWithFormattedMessage(diceerrors.WrongTypeErr) - } - - // only if the count is greater than 0, we need to check the other sets - if count > 0 { - // Get the set object. - set := obj.Value.(map[string]struct{}) - - for k := range set { - if _, ok := tmpSet[k]; ok { - delete(tmpSet, k) - count-- - } - } - } - } - - if count == 0 { - return clientio.Encode([]string{}, false) - } - - // Get the members of the set. - members := make([]string, 0, len(tmpSet)) - for k := range tmpSet { - members = append(members, k) - } - return clientio.Encode(members, false) -} - -// Migrated to the new eval, but kept for http and websocket -func evalSINTER(args []string, store *dstore.Store) []byte { - if len(args) < 1 { - return diceerrors.NewErrArity("SINTER") - } - - sets := make([]map[string]struct{}, 0, len(args)) - - empty := 0 - - for _, arg := range args { - // Get the set object from the store. - obj := store.Get(arg) - - if obj == nil { - empty++ - continue - } - - // If the object exists, check if it is a set object. - if err := object.AssertType(obj.Type, object.ObjTypeSet); err != nil { - return diceerrors.NewErrWithFormattedMessage(diceerrors.WrongTypeErr) - } - - // Get the set object. - set := obj.Value.(map[string]struct{}) - sets = append(sets, set) - } - - if empty > 0 { - return clientio.Encode([]string{}, false) - } - - // sort the sets by the number of elements in the set - // we will iterate over the smallest set - // and check if the element is present in all the other sets - sort.Slice(sets, func(i, j int) bool { - return len(sets[i]) < len(sets[j]) - }) - - count := 0 - resultSet := make(map[string]struct{}, len(sets[0])) - - // init the result set with the first set - // store the number of elements in the first set in count - // we will decrement the count if we do not find the elements in the other sets - for k := range sets[0] { - resultSet[k] = struct{}{} - count++ - } - - for i := 1; i < len(sets); i++ { - if count == 0 { - break - } - for k := range resultSet { - if _, ok := sets[i][k]; !ok { - delete(resultSet, k) - count-- - } - } - } - - if count == 0 { - return clientio.Encode([]string{}, false) - } - - members := make([]string, 0, len(resultSet)) - for k := range resultSet { - members = append(members, k) - } - return clientio.Encode(members, false) -} - -// formatFloat formats float64 as string. -// Optionally appends a decimal (.0) for whole numbers, -// if b is true. -func formatFloat(f float64, b bool) string { - formatted := strconv.FormatFloat(f, 'f', -1, 64) - if b { - parts := strings.Split(formatted, ".") - if len(parts) == 1 { - formatted += ".0" - } - } - return formatted -} - -// This method executes each operation, contained in ops array, based on commands used. -func executeBitfieldOps(value *ByteArray, ops []utils.BitFieldOp) []interface{} { - overflowType := WRAP - var result []interface{} - for _, op := range ops { - switch op.Kind { - case GET: - res := value.getBits(int(op.Offset), int(op.EVal), op.EType == SIGNED) - result = append(result, res) - case SET: - prevValue := value.getBits(int(op.Offset), int(op.EVal), op.EType == SIGNED) - value.setBits(int(op.Offset), int(op.EVal), op.Value) - result = append(result, prevValue) - case INCRBY: - res, err := value.incrByBits(int(op.Offset), int(op.EVal), op.Value, overflowType, op.EType == SIGNED) - if err != nil { - result = append(result, nil) - } else { - result = append(result, res) - } - case OVERFLOW: - overflowType = op.EType - } - } - return result -} diff --git a/internal/eval/eval_test.go b/internal/eval/eval_test.go index f1a629a6e..41b0088df 100644 --- a/internal/eval/eval_test.go +++ b/internal/eval/eval_test.go @@ -43,7 +43,6 @@ func setupTest(store *dstore.Store) *dstore.Store { func TestEval(t *testing.T) { store := dstore.NewStore(nil, nil, nil) - testEvalMSET(t, store) testEvalECHO(t, store) testEvalHELLO(t, store) testEvalSET(t, store) @@ -72,7 +71,6 @@ func TestEval(t *testing.T) { testEvalEXPIRE(t, store) testEvalEXPIRETIME(t, store) testEvalEXPIREAT(t, store) - testEvalDbsize(t, store) testEvalGETSET(t, store) testEvalHSET(t, store) testEvalHMSET(t, store) @@ -132,7 +130,6 @@ func TestEval(t *testing.T) { testEvalBitFieldRO(t, store) testEvalGEOADD(t, store) testEvalGEODIST(t, store) - testEvalSINTER(t, store) testEvalJSONSTRAPPEND(t, store) testEvalINCR(t, store) testEvalINCRBY(t, store) @@ -464,19 +461,6 @@ func testEvalGETDEL(t *testing.T, store *dstore.Store) { runMigratedEvalTests(t, tests, evalGETDEL, store) } -func testEvalMSET(t *testing.T, store *dstore.Store) { - tests := map[string]evalTestCase{ - "nil value": {input: nil, output: []byte("-ERR wrong number of arguments for 'mset' command\r\n")}, - "empty array": {input: []string{}, output: []byte("-ERR wrong number of arguments for 'mset' command\r\n")}, - "one value": {input: []string{"KEY"}, output: []byte("-ERR wrong number of arguments for 'mset' command\r\n")}, - "key val pair": {input: []string{"KEY", "VAL"}, output: clientio.RespOK}, - "odd key val pair": {input: []string{"KEY", "VAL", "KEY2"}, output: []byte("-ERR wrong number of arguments for 'mset' command\r\n")}, - "even key val pair": {input: []string{"KEY", "VAL", "KEY2", "VAL2"}, output: clientio.RespOK}, - } - - runEvalTests(t, tests, evalMSET, store) -} - func testEvalGET(t *testing.T, store *dstore.Store) { tests := []evalTestCase{ { @@ -2896,54 +2880,6 @@ func testEvalPersist(t *testing.T, store *dstore.Store) { runMigratedEvalTests(t, tests, evalPERSIST, store) } -func testEvalDbsize(t *testing.T, store *dstore.Store) { - tests := map[string]evalTestCase{ - "DBSIZE command with invalid no of args": { - input: []string{"INVALID_ARG"}, - output: []byte("-ERR wrong number of arguments for 'dbsize' command\r\n"), - }, - "no key in db": { - input: nil, - output: []byte(":0\r\n"), - }, - "one key exists in db": { - setup: func() { - evalSET([]string{"key", "val"}, store) - }, - input: nil, - output: []byte(":1\r\n"), - }, - "two keys exist in db": { - setup: func() { - evalSET([]string{"key1", "val1"}, store) - evalSET([]string{"key2", "val2"}, store) - }, - input: nil, - output: []byte(":2\r\n"), - }, - "repeating keys shall result in same dbsize": { - setup: func() { - evalSET([]string{"key1", "val1"}, store) - evalSET([]string{"key2", "val2"}, store) - evalSET([]string{"key2", "val2"}, store) - }, - input: nil, - output: []byte(":2\r\n"), - }, - "deleted keys shall be reflected in dbsize": { - setup: func() { - evalSET([]string{"key1", "val1"}, store) - evalSET([]string{"key2", "val2"}, store) - evalDEL([]string{"key2"}, store) - }, - input: nil, - output: []byte(":1\r\n"), - }, - } - - runEvalTests(t, tests, evalDBSIZE, store) -} - func testEvalPFADD(t *testing.T, store *dstore.Store) { tests := map[string]evalTestCase{ "PFADD nil value": { @@ -4397,14 +4333,6 @@ func runMigratedEvalTests(t *testing.T, tests map[string]evalTestCase, evalFunc } } -func BenchmarkEvalMSET(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - store := dstore.NewStore(nil, nil, nil) - evalMSET([]string{"KEY", "VAL", "KEY2", "VAL2"}, store) - } -} - func BenchmarkEvalHSET(b *testing.B) { store := dstore.NewStore(nil, nil, nil) for i := 0; i < b.N; i++ { @@ -5512,7 +5440,7 @@ func testEvalCOMMAND(t *testing.T, store *dstore.Store) { }, }, "command getkeys with an invalid number of arguments for a command": { - input: []string{"GETKEYS", "MSET", "key1"}, + input: []string{"GETKEYS", "SET", "key1"}, migratedOutput: EvalResponse{ Result: nil, Error: diceerrors.ErrGeneral("invalid number of arguments specified for command"), @@ -6097,13 +6025,6 @@ func testEvalHSETNX(t *testing.T, store *dstore.Store) { runMigratedEvalTests(t, tests, evalHSETNX, store) } -func TestMSETConsistency(t *testing.T) { - store := dstore.NewStore(nil, nil, nil) - evalMSET([]string{"KEY", "VAL", "KEY2", "VAL2"}, store) - - assert.Equal(t, "VAL", store.Get("KEY").Value) - assert.Equal(t, "VAL2", store.Get("KEY2").Value) -} func BenchmarkEvalHINCRBY(b *testing.B) { store := dstore.NewStore(nil, nil, nil) @@ -8108,7 +8029,7 @@ func testEvalDUMP(t *testing.T, store *dstore.Store) { input: []string{"INTEGER_KEY"}, migratedOutput: EvalResponse{ Result: "CQUAAAAAAAAACv9+l81XgsShqw==", - Error: nil, + Error: nil, }, }, "dump expired key": { @@ -8326,56 +8247,6 @@ func testEvalGEODIST(t *testing.T, store *dstore.Store) { runMigratedEvalTests(t, tests, evalGEODIST, store) } -func testEvalSINTER(t *testing.T, store *dstore.Store) { - tests := map[string]evalTestCase{ - "intersection of two sets": { - setup: func() { - evalSADD([]string{"set1", "a", "b", "c"}, store) - evalSADD([]string{"set2", "c", "d", "e"}, store) - }, - input: []string{"set1", "set2"}, - output: clientio.Encode([]string{"c"}, false), - }, - "intersection of three sets": { - setup: func() { - evalSADD([]string{"set1", "a", "b", "c"}, store) - evalSADD([]string{"set2", "b", "c", "d"}, store) - evalSADD([]string{"set3", "c", "d", "e"}, store) - }, - input: []string{"set1", "set2", "set3"}, - output: clientio.Encode([]string{"c"}, false), - }, - "intersection with single set": { - setup: func() { - evalSADD([]string{"set1", "a"}, store) - }, - input: []string{"set1"}, - output: clientio.Encode([]string{"a"}, false), - }, - "intersection with a non-existent key": { - setup: func() { - evalSADD([]string{"set1", "a", "b", "c"}, store) - }, - input: []string{"set1", "nonexistent"}, - output: clientio.Encode([]string{}, false), - }, - "intersection with wrong type": { - setup: func() { - evalSADD([]string{"set1", "a", "b", "c"}, store) - store.Put("string", &object.Obj{Value: "string", Type: object.ObjTypeString}) - }, - input: []string{"set1", "string"}, - output: []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"), - }, - "no arguments": { - input: []string{}, - output: diceerrors.NewErrArity("SINTER"), - }, - } - - runEvalTests(t, tests, evalSINTER, store) -} - func testEvalJSONSTRAPPEND(t *testing.T, store *dstore.Store) { tests := map[string]evalTestCase{ "append to single field": { diff --git a/internal/eval/store_eval.go b/internal/eval/store_eval.go index c80ad9d63..015d20b00 100644 --- a/internal/eval/store_eval.go +++ b/internal/eval/store_eval.go @@ -5651,7 +5651,7 @@ func evalJSONTOGGLE(args []string, store *dstore.Store) *EvalResponse { obj.Value = jsonData } - toggleResults = ReverseSlice(toggleResults) + toggleResults = reverseSlice(toggleResults) return makeEvalResult(toggleResults) } @@ -6772,3 +6772,136 @@ func evalCommandDocs(args []string) *EvalResponse { return makeEvalResult(result) } + +// This method executes each operation, contained in ops array, based on commands used. +func executeBitfieldOps(value *ByteArray, ops []utils.BitFieldOp) []interface{} { + overflowType := WRAP + var result []interface{} + for _, op := range ops { + switch op.Kind { + case GET: + res := value.getBits(int(op.Offset), int(op.EVal), op.EType == SIGNED) + result = append(result, res) + case SET: + prevValue := value.getBits(int(op.Offset), int(op.EVal), op.EType == SIGNED) + value.setBits(int(op.Offset), int(op.EVal), op.Value) + result = append(result, prevValue) + case INCRBY: + res, err := value.incrByBits(int(op.Offset), int(op.EVal), op.Value, overflowType, op.EType == SIGNED) + if err != nil { + result = append(result, nil) + } else { + result = append(result, res) + } + case OVERFLOW: + overflowType = op.EType + } + } + return result +} + +// formatFloat formats float64 as string. +// Optionally appends a decimal (.0) for whole numbers, +// if b is true. +func formatFloat(f float64, b bool) string { + formatted := strconv.FormatFloat(f, 'f', -1, 64) + if b { + parts := strings.Split(formatted, ".") + if len(parts) == 1 { + formatted += ".0" + } + } + return formatted +} + +// trimElementAndUpdateArray trim the array between the given start and stop index +// Returns trimmed array +func trimElementAndUpdateArray(arr []any, start, stop int) []any { + updatedArray := make([]any, 0) + length := len(arr) + if len(arr) == 0 { + return updatedArray + } + var startIdx, stopIdx int + + if start >= length { + return updatedArray + } + + startIdx = adjustIndex(start, arr) + stopIdx = adjustIndex(stop, arr) + + if startIdx > stopIdx { + return updatedArray + } + + updatedArray = arr[startIdx : stopIdx+1] + return updatedArray +} + +// insertElementAndUpdateArray add an element at the given index +// Returns remaining array and error +func insertElementAndUpdateArray(arr []any, index int, elements []interface{}) (updatedArray []any, err error) { + length := len(arr) + var idx int + if index >= -length && index <= length { + idx = adjustIndex(index, arr) + } else { + return nil, errors.New("index out of bounds") + } + before := arr[:idx] + after := arr[idx:] + + elements = append(elements, after...) + before = append(before, elements...) + updatedArray = append(updatedArray, before...) + return updatedArray, nil +} + +// adjustIndex will bound the array between 0 and len(arr) - 1 +// It also handles negative indexes +func adjustIndex(idx int, arr []any) int { + // if index is positive and out of bound, limit it to the last index + if idx > len(arr) { + idx = len(arr) - 1 + } + + // if index is negative, change it to equivalent positive index + if idx < 0 { + // if index is out of bound then limit it to the first index + if idx < -len(arr) { + idx = 0 + } else { + idx = len(arr) + idx + } + } + return idx +} + +// reverseSlice takes a slice of any type and returns a new slice with the elements reversed. +func reverseSlice[T any](slice []T) []T { + reversed := make([]T, len(slice)) + for i, v := range slice { + reversed[len(slice)-1-i] = v + } + return reversed +} + +// Parses and returns the input string as an int64 or float64 +func parseFloatInt(input string) (result interface{}, err error) { + // Try to parse as an integer + if intValue, parseErr := strconv.ParseInt(input, 10, 64); parseErr == nil { + result = intValue + return + } + + // Try to parse as a float + if floatValue, parseErr := strconv.ParseFloat(input, 64); parseErr == nil { + result = floatValue + return + } + + // If neither parsing succeeds, return an error + err = errors.New("invalid input: not a valid int or float") + return +} diff --git a/internal/iothread/cmd_meta.go b/internal/iothread/cmd_meta.go index bda33d29f..ce7c35b7b 100644 --- a/internal/iothread/cmd_meta.go +++ b/internal/iothread/cmd_meta.go @@ -129,10 +129,14 @@ const ( CmdGetDel = "GETDEL" CmdLrange = "LRANGE" CmdLinsert = "LINSERT" + CmdJSONArrInsert = "JSON.ARRINSERT" + CmdJSONArrTrim = "JSON.ARRTRIM" CmdJSONArrAppend = "JSON.ARRAPPEND" CmdJSONArrLen = "JSON.ARRLEN" CmdJSONArrPop = "JSON.ARRPOP" CmdJSONClear = "JSON.CLEAR" + CmdJSONSet = "JSON.SET" + CmdJSONObjKeys = "JSON.OBJKEYS" CmdJSONDel = "JSON.DEL" CmdJSONForget = "JSON.FORGET" CmdJSONGet = "JSON.GET" @@ -267,6 +271,12 @@ var CommandsMeta = map[string]CmdMeta{ CmdJSONArrAppend: { CmdType: SingleShard, }, + CmdJSONArrInsert: { + CmdType: SingleShard, + }, + CmdJSONArrTrim: { + CmdType: SingleShard, + }, CmdJSONArrLen: { CmdType: SingleShard, }, @@ -276,6 +286,12 @@ var CommandsMeta = map[string]CmdMeta{ CmdJSONClear: { CmdType: SingleShard, }, + CmdJSONSet: { + CmdType: SingleShard, + }, + CmdJSONObjKeys: { + CmdType: SingleShard, + }, CmdJSONDel: { CmdType: SingleShard, }, diff --git a/internal/server/cmd_meta.go b/internal/server/cmd_meta.go deleted file mode 100644 index fc9927d4a..000000000 --- a/internal/server/cmd_meta.go +++ /dev/null @@ -1,652 +0,0 @@ -package server - -import ( - "github.com/dicedb/dice/internal/cmd" - "github.com/dicedb/dice/internal/comm" - "github.com/dicedb/dice/internal/eval" - "github.com/dicedb/dice/internal/shard" -) - -// Type defines the type of DiceDB command based on how it interacts with shards. -// It uses an integer value to represent different command types. -type Type int - -// Enum values for Type using iota for auto-increment. -// Global commands don't interact with shards, SingleShard commands interact with one shard, -// MultiShard commands interact with multiple shards, and Custom commands require a direct client connection. -const ( - Global Type = iota // Global commands don't need to interact with shards. - SingleShard // Single-shard commands interact with only one shard. - MultiShard // MultiShard commands interact with multiple shards using scatter-gather logic. - Custom // Custom commands involve direct client communication. -) - -// CmdMeta stores metadata about DiceDB commands, including how they are processed across shards. -// Type indicates how the command should be handled, while Breakup and Gather provide logic -// for breaking up multishard commands and gathering their responses. -type CmdMeta struct { - Cmd string // Command name. - Breakup func(mgr *shard.ShardManager, DiceDBCmd *cmd.DiceDBCmd, c *comm.Client) []cmd.DiceDBCmd // Function to break up multishard commands. - Gather func(responses ...eval.EvalResponse) []byte // Function to gather responses from shards. - RespNoShards func(args []string) []byte // Function for commands that don't interact with shards. - Type // Enum indicating the command type. -} - -// CmdMetaMap is a map that associates command names with their corresponding metadata. -var ( - CmdMetaMap = map[string]CmdMeta{} - - // Metadata for global commands that don't interact with shards. - // PING is an example of global command. - pingCmdMeta = CmdMeta{ - Cmd: "PING", - Type: Global, - } - - // Metadata for single-shard commands that only interact with one shard. - // These commands don't require breakup and gather logic. - setCmdMeta = CmdMeta{ - Cmd: "SET", - Type: SingleShard, - } - expireCmdMeta = CmdMeta{ - Cmd: "EXPIRE", - Type: SingleShard, - } - expireAtCmdMeta = CmdMeta{ - Cmd: "EXPIREAT", - Type: SingleShard, - } - expireTimeCmdMeta = CmdMeta{ - Cmd: "EXPIRETIME", - Type: SingleShard, - } - getCmdMeta = CmdMeta{ - Cmd: "GET", - Type: SingleShard, - } - getsetCmdMeta = CmdMeta{ - Cmd: "GETSET", - Type: SingleShard, - } - setexCmdMeta = CmdMeta{ - Cmd: "SETEX", - Type: SingleShard, - } - saddCmdMeta = CmdMeta{ - Cmd: "SADD", - Type: SingleShard, - } - sremCmdMeta = CmdMeta{ - Cmd: "SREM", - Type: SingleShard, - } - scardCmdMeta = CmdMeta{ - Cmd: "SCARD", - Type: SingleShard, - } - smembersCmdMeta = CmdMeta{ - Cmd: "SMEMBERS", - } - - jsonArrAppendCmdMeta = CmdMeta{ - Cmd: "JSON.ARRAPPEND", - Type: SingleShard, - } - jsonArrLenCmdMeta = CmdMeta{ - Cmd: "JSON.ARRLEN", - Type: SingleShard, - } - jsonArrPopCmdMeta = CmdMeta{ - Cmd: "JSON.ARRPOP", - Type: SingleShard, - } - jsonDebugCmdMeta = CmdMeta{ - Cmd: "JSON.DEBUG", - Type: SingleShard, - } - jsonRespCmdMeta = CmdMeta{ - Cmd: "JSON.RESP", - Type: SingleShard, - } - - getrangeCmdMeta = CmdMeta{ - Cmd: "GETRANGE", - Type: SingleShard, - } - hexistsCmdMeta = CmdMeta{ - Cmd: "HEXISTS", - Type: SingleShard, - } - hkeysCmdMeta = CmdMeta{ - Cmd: "HKEYS", - Type: SingleShard, - } - - hvalsCmdMeta = CmdMeta{ - Cmd: "HVALS", - Type: SingleShard, - } - zaddCmdMeta = CmdMeta{ - Cmd: "ZADD", - Type: SingleShard, - } - zcountCmdMeta = CmdMeta{ - Cmd: "ZCOUNT", - Type: SingleShard, - } - zrangeCmdMeta = CmdMeta{ - Cmd: "ZRANGE", - Type: SingleShard, - } - appendCmdMeta = CmdMeta{ - Cmd: "APPEND", - Type: SingleShard, - } - zpopminCmdMeta = CmdMeta{ - Cmd: "ZPOPMIN", - Type: SingleShard, - } - zrankCmdMeta = CmdMeta{ - Cmd: "ZRANK", - Type: SingleShard, - } - zcardCmdMeta = CmdMeta{ - Cmd: "ZCARD", - Type: SingleShard, - } - zremCmdMeta = CmdMeta{ - Cmd: "ZREM", - Type: SingleShard, - } - pfaddCmdMeta = CmdMeta{ - Cmd: "PFADD", - Type: SingleShard, - } - pfcountCmdMeta = CmdMeta{ - Cmd: "PFCOUNT", - Type: SingleShard, - } - pfmergeCmdMeta = CmdMeta{ - Cmd: "PFMERGE", - Type: SingleShard, - } - ttlCmdMeta = CmdMeta{ - Cmd: "TTL", - Type: SingleShard, - } - pttlCmdMeta = CmdMeta{ - Cmd: "PTTL", - Type: SingleShard, - } - setbitCmdMeta = CmdMeta{ - Cmd: "SETBIT", - Type: SingleShard, - } - getbitCmdMeta = CmdMeta{ - Cmd: "GETBIT", - Type: SingleShard, - } - bitcountCmdMeta = CmdMeta{ - Cmd: "BITCOUNT", - Type: SingleShard, - } - bitfieldCmdMeta = CmdMeta{ - Cmd: "BITFIELD", - Type: SingleShard, - } - bitposCmdMeta = CmdMeta{ - Cmd: "BITPOS", - Type: SingleShard, - } - bitfieldroCmdMeta = CmdMeta{ - Cmd: "BITFIELD_RO", - Type: SingleShard, - } - delCmdMeta = CmdMeta{ - Cmd: "DEL", - Type: SingleShard, - } - existsCmdMeta = CmdMeta{ - Cmd: "EXISTS", - Type: SingleShard, - } - persistCmdMeta = CmdMeta{ - Cmd: "PERSIST", - Type: SingleShard, - } - typeCmdMeta = CmdMeta{ - Cmd: "TYPE", - Type: SingleShard, - } - - jsonclearCmdMeta = CmdMeta{ - Cmd: "JSON.CLEAR", - Type: SingleShard, - } - - jsonstrlenCmdMeta = CmdMeta{ - Cmd: "JSON.STRLEN", - Type: SingleShard, - } - - jsonobjlenCmdMeta = CmdMeta{ - Cmd: "JSON.OBJLEN", - Type: SingleShard, - } - hlenCmdMeta = CmdMeta{ - Cmd: "HLEN", - Type: SingleShard, - } - hstrlenCmdMeta = CmdMeta{ - Cmd: "HSTRLEN", - Type: SingleShard, - } - hscanCmdMeta = CmdMeta{ - Cmd: "HSCAN", - Type: SingleShard, - } - - jsonarrinsertCmdMeta = CmdMeta{ - Cmd: "JSON.ARRINSERT", - Type: SingleShard, - } - - jsonarrtrimCmdMeta = CmdMeta{ - Cmd: "JSON.ARRTRIM", - Type: SingleShard, - } - - jsonobjkeystCmdMeta = CmdMeta{ - Cmd: "JSON.OBJKEYS", - Type: SingleShard, - } - - incrCmdMeta = CmdMeta{ - Cmd: "INCR", - Type: SingleShard, - } - incrByCmdMeta = CmdMeta{ - Cmd: "INCRBY", - Type: SingleShard, - } - decrCmdMeta = CmdMeta{ - Cmd: "DECR", - Type: SingleShard, - } - decrByCmdMeta = CmdMeta{ - Cmd: "DECRBY", - Type: SingleShard, - } - incrByFloatCmdMeta = CmdMeta{ - Cmd: "INCRBYFLOAT", - Type: SingleShard, - } - hincrbyCmdMeta = CmdMeta{ - Cmd: "HINCRBY", - Type: SingleShard, - } - hincrbyfloatCmdMeta = CmdMeta{ - Cmd: "HINCRBYFLOAT", - Type: SingleShard, - } - hrandfieldCmdMeta = CmdMeta{ - Cmd: "HRANDFIELD", - Type: SingleShard, - } - zpopmaxCmdMeta = CmdMeta{ - Cmd: "ZPOPMAX", - Type: SingleShard, - } - bfaddCmdMeta = CmdMeta{ - Cmd: "BF.ADD", - Type: SingleShard, - } - bfreserveCmdMeta = CmdMeta{ - Cmd: "BF.RESERVE", - Type: SingleShard, - } - bfexistsCmdMeta = CmdMeta{ - Cmd: "BF.EXISTS", - Type: SingleShard, - } - bfinfoCmdMeta = CmdMeta{ - Cmd: "BF.INFO", - Type: SingleShard, - } - cmsInitByDimCmdMeta = CmdMeta{ - Cmd: "CMS.INITBYDIM", - Type: SingleShard, - } - cmsInitByProbCmdMeta = CmdMeta{ - Cmd: "CMS.INITBYPROB", - Type: SingleShard, - } - cmsInfoCmdMeta = CmdMeta{ - Cmd: "CMS.INFO", - Type: SingleShard, - } - cmsIncrByCmdMeta = CmdMeta{ - Cmd: "CMS.INCRBY", - Type: SingleShard, - } - cmsQueryCmdMeta = CmdMeta{ - Cmd: "CMS.QUERY", - Type: SingleShard, - } - cmsMergeCmdMeta = CmdMeta{ - Cmd: "CMS.MERGE", - Type: SingleShard, - } - getexCmdMeta = CmdMeta{ - Cmd: "GETEX", - Type: SingleShard, - } - getdelCmdMeta = CmdMeta{ - Cmd: "GETDEL", - Type: SingleShard, - } - hsetCmdMeta = CmdMeta{ - Cmd: "HSET", - Type: SingleShard, - } - hgetCmdMeta = CmdMeta{ - Cmd: "HGET", - Type: SingleShard, - } - hsetnxCmdMeta = CmdMeta{ - Cmd: "HSETNX", - Type: SingleShard, - } - hdelCmdMeta = CmdMeta{ - Cmd: "HDEL", - Type: SingleShard, - } - hmsetCmdMeta = CmdMeta{ - Cmd: "HMSET", - Type: SingleShard, - } - hmgetCmdMeta = CmdMeta{ - Cmd: "HMGET", - Type: SingleShard, - } - lrangeCmdMeta = CmdMeta{ - Cmd: "LRANGE", - Type: SingleShard, - } - linsertCmdMeta = CmdMeta{ - Cmd: "LINSERT", - Type: SingleShard, - } - lpushCmdMeta = CmdMeta{ - Cmd: "LPUSH", - Type: SingleShard, - } - rpushCmdMeta = CmdMeta{ - Cmd: "RPUSH", - Type: SingleShard, - } - lpopCmdMeta = CmdMeta{ - Cmd: "LPOP", - Type: SingleShard, - } - rpopCmdMeta = CmdMeta{ - Cmd: "RPOP", - Type: SingleShard, - } - llenCmdMeta = CmdMeta{ - Cmd: "LLEN", - Type: SingleShard, - } - jsonForgetCmdMeta = CmdMeta{ - Cmd: "JSON.FORGET", - Type: SingleShard, - } - jsonDelCmdMeta = CmdMeta{ - Cmd: "JSON.DEL", - Type: SingleShard, - } - jsonToggleCmdMeta = CmdMeta{ - Cmd: "JSON.TOGGLE", - Type: SingleShard, - } - jsonNumIncrByCmdMeta = CmdMeta{ - Cmd: "JSON.NUMINCRBY", - Type: SingleShard, - } - jsonNumMultByCmdMeta = CmdMeta{ - Cmd: "JSON.NUMMULTBY", - Type: SingleShard, - } - jsonSetCmdMeta = CmdMeta{ - Cmd: "JSON.SET", - Type: SingleShard, - } - jsonGetCmdMeta = CmdMeta{ - Cmd: "JSON.GET", - Type: SingleShard, - } - jsonTypeCmdMeta = CmdMeta{ - Cmd: "JSON.TYPE", - Type: SingleShard, - } - jsonIngestCmdMeta = CmdMeta{ - Cmd: "JSON.INGEST", - Type: SingleShard, - } - jsonArrStrAppendCmdMeta = CmdMeta{ - Cmd: "JSON.STRAPPEND", - Type: SingleShard, - } - hGetAllCmdMeta = CmdMeta{ - Cmd: "HGETALL", - Type: SingleShard, - } - dumpCmdMeta = CmdMeta{ - Cmd: "DUMP", - Type: SingleShard, - } - restoreCmdMeta = CmdMeta{ - Cmd: "RESTORE", - Type: SingleShard, - } - geoaddCmdMeta = CmdMeta{ - Cmd: "GEOADD", - Type: SingleShard, - } - geodistCmdMeta = CmdMeta{ - Cmd: "GEODIST", - Type: SingleShard, - } - clientCmdMeta = CmdMeta{ - Cmd: "CLIENT", - Type: SingleShard, - } - latencyCmdMeta = CmdMeta{ - Cmd: "LATENCY", - Type: SingleShard, - } - flushDBCmdMeta = CmdMeta{ - Cmd: "FLUSHDB", - Type: MultiShard, - } - objectCmdMeta = CmdMeta{ - Cmd: "OBJECT", - Type: SingleShard, - } - commandCmdMeta = CmdMeta{ - Cmd: "COMMAND", - Type: SingleShard, - } - CmdCommandCountMeta = CmdMeta{ - Cmd: "COMMAND|COUNT", - Type: SingleShard, - } - CmdCommandHelp = CmdMeta{ - Cmd: "COMMAND|HELP", - Type: SingleShard, - } - CmdCommandInfo = CmdMeta{ - Cmd: "COMMAND|INFO", - Type: SingleShard, - } - CmdCommandList = CmdMeta{ - Cmd: "COMMAND|LIST", - Type: SingleShard, - } - CmdCommandDocs = CmdMeta{ - Cmd: "COMMAND|DOCS", - Type: SingleShard, - } - CmdCommandGetKeys = CmdMeta{ - Cmd: "COMMAND|GETKEYS", - Type: SingleShard, - } - CmdCommandGetKeysFlags = CmdMeta{ - Cmd: "COMMAND|GETKEYSANDFLAGS", - Type: SingleShard, - } - - // Metadata for multishard commands would go here. - // These commands require both breakup and gather logic. - - // Metadata for custom commands requiring specific client-side logic would go here. -) - -// init initializes the CmdMetaMap map by associating each command name with its corresponding metadata. -func init() { - // Global commands. - CmdMetaMap["PING"] = pingCmdMeta - - // Single-shard commands. - CmdMetaMap["SET"] = setCmdMeta - CmdMetaMap["EXPIRE"] = expireCmdMeta - CmdMetaMap["EXPIREAT"] = expireAtCmdMeta - CmdMetaMap["EXPIRETIME"] = expireTimeCmdMeta - CmdMetaMap["GET"] = getCmdMeta - CmdMetaMap["GETSET"] = getsetCmdMeta - CmdMetaMap["SETEX"] = setexCmdMeta - - CmdMetaMap["SADD"] = saddCmdMeta - CmdMetaMap["SREM"] = sremCmdMeta - CmdMetaMap["SCARD"] = scardCmdMeta - CmdMetaMap["SMEMBERS"] = smembersCmdMeta - - CmdMetaMap["JSON.ARRAPPEND"] = jsonArrAppendCmdMeta - CmdMetaMap["JSON.ARRLEN"] = jsonArrLenCmdMeta - CmdMetaMap["JSON.ARRPOP"] = jsonArrPopCmdMeta - CmdMetaMap["JSON.DEBUG"] = jsonDebugCmdMeta - CmdMetaMap["JSON.RESP"] = jsonRespCmdMeta - - CmdMetaMap["GETRANGE"] = getrangeCmdMeta - CmdMetaMap["APPEND"] = appendCmdMeta - CmdMetaMap["JSON.CLEAR"] = jsonclearCmdMeta - CmdMetaMap["JSON.STRLEN"] = jsonstrlenCmdMeta - CmdMetaMap["JSON.OBJLEN"] = jsonobjlenCmdMeta - CmdMetaMap["HEXISTS"] = hexistsCmdMeta - CmdMetaMap["HKEYS"] = hkeysCmdMeta - CmdMetaMap["HVALS"] = hvalsCmdMeta - CmdMetaMap["JSON.ARRINSERT"] = jsonarrinsertCmdMeta - CmdMetaMap["JSON.ARRTRIM"] = jsonarrtrimCmdMeta - CmdMetaMap["JSON.OBJKEYS"] = jsonobjkeystCmdMeta - CmdMetaMap["ZADD"] = zaddCmdMeta - CmdMetaMap["ZCOUNT"] = zcountCmdMeta - CmdMetaMap["ZRANGE"] = zrangeCmdMeta - CmdMetaMap["ZRANK"] = zrankCmdMeta - CmdMetaMap["ZCARD"] = zcardCmdMeta - CmdMetaMap["ZREM"] = zremCmdMeta - CmdMetaMap["PFADD"] = pfaddCmdMeta - CmdMetaMap["ZPOPMIN"] = zpopminCmdMeta - CmdMetaMap["PFCOUNT"] = pfcountCmdMeta - CmdMetaMap["PFMERGE"] = pfmergeCmdMeta - CmdMetaMap["DEL"] = delCmdMeta - CmdMetaMap["EXISTS"] = existsCmdMeta - CmdMetaMap["PERSIST"] = persistCmdMeta - CmdMetaMap["TYPE"] = typeCmdMeta - CmdMetaMap["HLEN"] = hlenCmdMeta - CmdMetaMap["HSTRLEN"] = hstrlenCmdMeta - CmdMetaMap["HSCAN"] = hscanCmdMeta - CmdMetaMap["INCR"] = incrCmdMeta - CmdMetaMap["INCRBY"] = incrByCmdMeta - CmdMetaMap["INCR"] = incrCmdMeta - CmdMetaMap["DECR"] = decrCmdMeta - CmdMetaMap["DECRBY"] = decrByCmdMeta - CmdMetaMap["INCRBYFLOAT"] = incrByFloatCmdMeta - CmdMetaMap["HINCRBY"] = hincrbyCmdMeta - CmdMetaMap["HINCRBYFLOAT"] = hincrbyfloatCmdMeta - CmdMetaMap["HRANDFIELD"] = hrandfieldCmdMeta - CmdMetaMap["PFADD"] = pfaddCmdMeta - CmdMetaMap["ZPOPMIN"] = zpopminCmdMeta - CmdMetaMap["PFCOUNT"] = pfcountCmdMeta - CmdMetaMap["PFMERGE"] = pfmergeCmdMeta - CmdMetaMap["TTL"] = ttlCmdMeta - CmdMetaMap["PTTL"] = pttlCmdMeta - CmdMetaMap["HINCRBY"] = hincrbyCmdMeta - CmdMetaMap["HINCRBYFLOAT"] = hincrbyfloatCmdMeta - CmdMetaMap["HRANDFIELD"] = hrandfieldCmdMeta - CmdMetaMap["PFADD"] = pfaddCmdMeta - CmdMetaMap["PFCOUNT"] = pfcountCmdMeta - CmdMetaMap["PFMERGE"] = pfmergeCmdMeta - CmdMetaMap["HINCRBY"] = hincrbyCmdMeta - CmdMetaMap["HINCRBYFLOAT"] = hincrbyfloatCmdMeta - CmdMetaMap["HRANDFIELD"] = hrandfieldCmdMeta - CmdMetaMap["ZPOPMAX"] = zpopmaxCmdMeta - CmdMetaMap["BF.ADD"] = bfaddCmdMeta - CmdMetaMap["BF.RESERVE"] = bfreserveCmdMeta - CmdMetaMap["BF.EXISTS"] = bfexistsCmdMeta - CmdMetaMap["BF.INFO"] = bfinfoCmdMeta - CmdMetaMap["CMS.INITBYDIM"] = cmsInitByDimCmdMeta - CmdMetaMap["CMS.INITBYPROB"] = cmsInitByProbCmdMeta - CmdMetaMap["CMS.INFO"] = cmsInfoCmdMeta - CmdMetaMap["CMS.INCRBY"] = cmsIncrByCmdMeta - CmdMetaMap["CMS.QUERY"] = cmsQueryCmdMeta - CmdMetaMap["CMS.MERGE"] = cmsMergeCmdMeta - CmdMetaMap["GETEX"] = getexCmdMeta - CmdMetaMap["GETDEL"] = getdelCmdMeta - CmdMetaMap["HSET"] = hsetCmdMeta - CmdMetaMap["HGET"] = hgetCmdMeta - CmdMetaMap["HSETNX"] = hsetnxCmdMeta - CmdMetaMap["HDEL"] = hdelCmdMeta - CmdMetaMap["HMSET"] = hmsetCmdMeta - CmdMetaMap["HMGET"] = hmgetCmdMeta - CmdMetaMap["SETBIT"] = setbitCmdMeta - CmdMetaMap["GETBIT"] = getbitCmdMeta - CmdMetaMap["BITCOUNT"] = bitcountCmdMeta - CmdMetaMap["BITFIELD"] = bitfieldCmdMeta - CmdMetaMap["BITPOS"] = bitposCmdMeta - CmdMetaMap["BITFIELD_RO"] = bitfieldroCmdMeta - CmdMetaMap["LRANGE"] = lrangeCmdMeta - CmdMetaMap["LINSERT"] = linsertCmdMeta - CmdMetaMap["LPUSH"] = lpushCmdMeta - CmdMetaMap["RPUSH"] = rpushCmdMeta - CmdMetaMap["LPOP"] = lpopCmdMeta - CmdMetaMap["RPOP"] = rpopCmdMeta - CmdMetaMap["LLEN"] = llenCmdMeta - CmdMetaMap["JSON.FORGET"] = jsonForgetCmdMeta - CmdMetaMap["JSON.DEL"] = jsonDelCmdMeta - CmdMetaMap["JSON.TOGGLE"] = jsonToggleCmdMeta - CmdMetaMap["JSON.NUMINCRBY"] = jsonNumIncrByCmdMeta - CmdMetaMap["JSON.NUMMULTBY"] = jsonNumMultByCmdMeta - CmdMetaMap["JSON.SET"] = jsonSetCmdMeta - CmdMetaMap["JSON.GET"] = jsonGetCmdMeta - CmdMetaMap["JSON.TYPE"] = jsonTypeCmdMeta - CmdMetaMap["JSON.INGEST"] = jsonIngestCmdMeta - CmdMetaMap["JSON.STRAPPEND"] = jsonArrStrAppendCmdMeta - CmdMetaMap["HGETALL"] = hGetAllCmdMeta - CmdMetaMap["DUMP"] = dumpCmdMeta - CmdMetaMap["RESTORE"] = restoreCmdMeta - CmdMetaMap["GEOADD"] = geoaddCmdMeta - CmdMetaMap["GEODIST"] = geodistCmdMeta - CmdMetaMap["CLIENT"] = clientCmdMeta - CmdMetaMap["LATENCY"] = latencyCmdMeta - CmdMetaMap["FLUSHDB"] = flushDBCmdMeta - CmdMetaMap["OBJECT"] = objectCmdMeta - CmdMetaMap["COMMAND"] = commandCmdMeta - CmdMetaMap["COMMAND|COUNT"] = CmdCommandCountMeta - CmdMetaMap["COMMAND|HELP"] = CmdCommandHelp - CmdMetaMap["COMMAND|INFO"] = CmdCommandInfo - CmdMetaMap["COMMAND|LIST"] = CmdCommandList - CmdMetaMap["COMMAND|DOCS"] = CmdCommandDocs - CmdMetaMap["COMMAND|GETKEYS"] = CmdCommandGetKeys - CmdMetaMap["COMMAND|GETKEYSANDFLAGS"] = CmdCommandGetKeysFlags -} diff --git a/internal/server/utils/httpResp.go b/internal/server/httpws/httpResp.go similarity index 92% rename from internal/server/utils/httpResp.go rename to internal/server/httpws/httpResp.go index ad66ec391..10f77ee76 100644 --- a/internal/server/utils/httpResp.go +++ b/internal/server/httpws/httpResp.go @@ -1,4 +1,4 @@ -package utils +package httpws const ( HTTPStatusSuccess string = "success" diff --git a/internal/server/httpServer.go b/internal/server/httpws/httpServer.go similarity index 88% rename from internal/server/httpServer.go rename to internal/server/httpws/httpServer.go index d472dbc62..f04c42aec 100644 --- a/internal/server/httpServer.go +++ b/internal/server/httpws/httpServer.go @@ -1,4 +1,4 @@ -package server +package httpws import ( "bytes" @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/dicedb/dice/internal/iothread" + "github.com/dicedb/dice/internal/eval" "github.com/dicedb/dice/internal/server/abstractserver" "github.com/dicedb/dice/internal/wal" @@ -22,7 +24,6 @@ import ( "github.com/dicedb/dice/internal/comm" derrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/ops" - "github.com/dicedb/dice/internal/server/utils" "github.com/dicedb/dice/internal/shard" ) @@ -133,16 +134,16 @@ func (s *HTTPServer) Run(ctx context.Context) error { func (s *HTTPServer) DiceHTTPHandler(writer http.ResponseWriter, request *http.Request) { // convert to REDIS cmd - diceDBCmd, err := utils.ParseHTTPRequest(request) + diceDBCmd, err := ParseHTTPRequest(request) if err != nil { - responseJSON, _ := json.Marshal(utils.HTTPResponse{Status: utils.HTTPStatusError, Data: "Invalid HTTP request format"}) - writer.Header().Set("Content-Type", "application/json") - writer.WriteHeader(http.StatusBadRequest) // Set HTTP status code to 500 - _, err = writer.Write(responseJSON) - if err != nil { - slog.Error("Error writing response", "error", err) - } - slog.Error("Error parsing HTTP request", slog.Any("error", err)) + writeErrorResponse(writer, http.StatusBadRequest, "Invalid HTTP request format", + "Error parsing HTTP request", slog.Any("error", err)) + return + } + + if iothread.CommandsMeta[diceDBCmd.Cmd].CmdType == iothread.MultiShard { + writeErrorResponse(writer, http.StatusBadRequest, "unsupported command", + "Unsupported command received", slog.String("cmd", diceDBCmd.Cmd)) return } @@ -154,14 +155,9 @@ func (s *HTTPServer) DiceHTTPHandler(writer http.ResponseWriter, request *http.R } if unimplementedCommands[diceDBCmd.Cmd] { - responseJSON, _ := json.Marshal(utils.HTTPResponse{Status: utils.HTTPStatusError, Data: fmt.Sprintf("Command %s is not implemented with HTTP", diceDBCmd.Cmd)}) - writer.Header().Set("Content-Type", "application/json") - writer.WriteHeader(http.StatusBadRequest) // Set HTTP status code to 500 - _, err = writer.Write(responseJSON) - if err != nil { - slog.Error("Error writing response", "error", err) - } - slog.Error("Command %s is not implemented", slog.String("cmd", diceDBCmd.Cmd)) + writeErrorResponse(writer, http.StatusBadRequest, + fmt.Sprintf("Command %s is not implemented with HTTP", diceDBCmd.Cmd), + "Command is not implemented", slog.String("cmd", diceDBCmd.Cmd)) return } @@ -181,7 +177,7 @@ func (s *HTTPServer) DiceHTTPHandler(writer http.ResponseWriter, request *http.R func (s *HTTPServer) DiceHTTPQwatchHandler(writer http.ResponseWriter, request *http.Request) { // convert to REDIS cmd - diceDBCmd, err := utils.ParseHTTPRequest(request) + diceDBCmd, err := ParseHTTPRequest(request) if err != nil { http.Error(writer, "Error parsing HTTP request", http.StatusBadRequest) slog.Error("Error parsing HTTP request", slog.Any("error", err)) @@ -336,19 +332,19 @@ func (s *HTTPServer) writeResponse(writer http.ResponseWriter, result *ops.Store var ( responseValue interface{} err error - httpResponse utils.HTTPResponse + httpResponse HTTPResponse isDiceErr bool ) // Check if the command is migrated, if it is we use EvalResponse values // else we use RESPParser to decode the response - _, ok := CmdMetaMap[diceDBCmd.Cmd] + _, ok := iothread.CommandsMeta[diceDBCmd.Cmd] // TODO: Remove this conditional check and if (true) condition when all commands are migrated - if !ok { + if !ok || iothread.CommandsMeta[diceDBCmd.Cmd].CmdType == iothread.Custom { responseValue, err = DecodeEvalResponse(result.EvalResponse) if err != nil { slog.Error("Error decoding response", "error", err) - httpResponse = utils.HTTPResponse{Status: utils.HTTPStatusError, Data: "Internal Server Error"} + httpResponse = HTTPResponse{Status: HTTPStatusError, Data: "Internal Server Error"} writeJSONResponse(writer, httpResponse, http.StatusInternalServerError) return } @@ -362,11 +358,11 @@ func (s *HTTPServer) writeResponse(writer http.ResponseWriter, result *ops.Store } // Create the HTTP response - httpResponse = utils.HTTPResponse{Data: ResponseParser(responseValue)} + httpResponse = HTTPResponse{Data: ResponseParser(responseValue)} if isDiceErr { - httpResponse.Status = utils.HTTPStatusError + httpResponse.Status = HTTPStatusError } else { - httpResponse.Status = utils.HTTPStatusSuccess + httpResponse.Status = HTTPStatusSuccess } // Write the response back to the client @@ -374,7 +370,7 @@ func (s *HTTPServer) writeResponse(writer http.ResponseWriter, result *ops.Store } // Helper function to write the JSON response -func writeJSONResponse(writer http.ResponseWriter, response utils.HTTPResponse, statusCode int) { +func writeJSONResponse(writer http.ResponseWriter, response HTTPResponse, statusCode int) { writer.Header().Set("Content-Type", "application/json") writer.WriteHeader(statusCode) @@ -391,6 +387,18 @@ func writeJSONResponse(writer http.ResponseWriter, response utils.HTTPResponse, } } +func writeErrorResponse(writer http.ResponseWriter, status int, message, logMessage string, logFields ...any) { + responseJSON, _ := json.Marshal(HTTPResponse{Status: HTTPStatusError, Data: message}) + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(status) + if _, err := writer.Write(responseJSON); err != nil { + slog.Error("HTTP-WS Error writing response", "error", err) + } + if logMessage != "" { + slog.Error(logMessage, logFields...) + } +} + // ResponseParser parses the response value for both migrated and non-migrated cmds and // returns response to be rendered for HTTP/WS response func ResponseParser(responseValue interface{}) interface{} { diff --git a/internal/server/utils/redisCmdAdapter.go b/internal/server/httpws/redisCmdAdapter.go similarity index 97% rename from internal/server/utils/redisCmdAdapter.go rename to internal/server/httpws/redisCmdAdapter.go index 06e093b27..577a73e68 100644 --- a/internal/server/utils/redisCmdAdapter.go +++ b/internal/server/httpws/redisCmdAdapter.go @@ -1,17 +1,18 @@ -package utils +package httpws import ( "encoding/base64" "encoding/json" "errors" "fmt" - "io" "net/http" "regexp" "strconv" "strings" + "github.com/dicedb/dice/internal/server/utils" + "github.com/dicedb/dice/internal/cmd" diceerrors "github.com/dicedb/dice/internal/errors" ) @@ -64,7 +65,7 @@ func ParseHTTPRequest(r *http.Request) (*cmd.DiceDBCmd, error) { queryParams := r.URL.Query() keyPrefix := queryParams.Get(KeyPrefix) - if keyPrefix != "" && command == JSONIngest { + if keyPrefix != "" && command == utils.JSONIngest { args = append(args, keyPrefix) } // Step 1: Handle JSON body if present @@ -173,7 +174,7 @@ func ParseWebsocketMessage(msg []byte) (*cmd.DiceDBCmd, error) { // if key prefix is empty for JSON.INGEST command // add "" to cmdArr - if command == JSONIngest && len(cmdArr) == 2 { + if command == utils.JSONIngest && len(cmdArr) == 2 { cmdArr = append([]string{""}, cmdArr...) } diff --git a/internal/server/utils/redisCmdAdapter_test.go b/internal/server/httpws/redisCmdAdapter_test.go similarity index 99% rename from internal/server/utils/redisCmdAdapter_test.go rename to internal/server/httpws/redisCmdAdapter_test.go index c50c7cf23..091b8bbc6 100644 --- a/internal/server/utils/redisCmdAdapter_test.go +++ b/internal/server/httpws/redisCmdAdapter_test.go @@ -1,4 +1,4 @@ -package utils +package httpws import ( "net/http/httptest" diff --git a/internal/server/websocketServer.go b/internal/server/httpws/websocketServer.go similarity index 96% rename from internal/server/websocketServer.go rename to internal/server/httpws/websocketServer.go index 93148614e..bc090ff2b 100644 --- a/internal/server/websocketServer.go +++ b/internal/server/httpws/websocketServer.go @@ -1,4 +1,4 @@ -package server +package httpws import ( "bytes" @@ -14,6 +14,8 @@ import ( "syscall" "time" + "github.com/dicedb/dice/internal/iothread" + "github.com/dicedb/dice/internal/server/abstractserver" "github.com/dicedb/dice/internal/wal" @@ -23,7 +25,6 @@ import ( "github.com/dicedb/dice/internal/comm" diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/ops" - "github.com/dicedb/dice/internal/server/utils" "github.com/dicedb/dice/internal/shard" "github.com/gorilla/websocket" "golang.org/x/exp/rand" @@ -143,7 +144,7 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques } // parse message to dice command - diceDBCmd, err := utils.ParseWebsocketMessage(msg) + diceDBCmd, err := ParseWebsocketMessage(msg) if errors.Is(err, diceerrors.ErrEmptyCommand) { continue } else if err != nil { @@ -153,6 +154,13 @@ func (s *WebsocketServer) WebsocketHandler(w http.ResponseWriter, r *http.Reques continue } + if iothread.CommandsMeta[diceDBCmd.Cmd].CmdType == iothread.MultiShard { + if err := WriteResponseWithRetries(conn, []byte("error: unsupported command"), maxRetries); err != nil { + slog.Debug(fmt.Sprintf("Error writing message: %v", err)) + } + continue + } + // TODO - on abort, close client connection instead of closing server? if diceDBCmd.Cmd == Abort { close(s.shutdownChan) @@ -271,7 +279,7 @@ func (s *WebsocketServer) processResponse(conn *websocket.Conn, diceDBCmd *cmd.D var responseValue interface{} // Check if the command is migrated, if it is we use EvalResponse values // else we use RESPParser to decode the response - _, ok := CmdMetaMap[diceDBCmd.Cmd] + _, ok := iothread.CommandsMeta[diceDBCmd.Cmd] // TODO: Remove this conditional check and if (true) condition when all commands are migrated if !ok { responseValue, err = DecodeEvalResponse(response.EvalResponse) diff --git a/main.go b/main.go index 09744ae52..0eea883a5 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,8 @@ import ( "syscall" "time" + "github.com/dicedb/dice/internal/server/httpws" + "github.com/dicedb/dice/internal/cli" "github.com/dicedb/dice/internal/logger" "github.com/dicedb/dice/internal/server/abstractserver" @@ -25,7 +27,6 @@ import ( diceerrors "github.com/dicedb/dice/internal/errors" "github.com/dicedb/dice/internal/iothread" "github.com/dicedb/dice/internal/observability" - "github.com/dicedb/dice/internal/server" "github.com/dicedb/dice/internal/server/resp" "github.com/dicedb/dice/internal/shard" dstore "github.com/dicedb/dice/internal/store" @@ -140,13 +141,13 @@ func main() { go runServer(ctx, &serverWg, respServer, serverErrCh) if config.DiceConfig.HTTP.Enabled { - httpServer := server.NewHTTPServer(shardManager, wl) + httpServer := httpws.NewHTTPServer(shardManager, wl) serverWg.Add(1) go runServer(ctx, &serverWg, httpServer, serverErrCh) } if config.DiceConfig.WebSocket.Enabled { - websocketServer := server.NewWebSocketServer(shardManager, config.DiceConfig.WebSocket.Port, wl) + websocketServer := httpws.NewWebSocketServer(shardManager, config.DiceConfig.WebSocket.Port, wl) serverWg.Add(1) go runServer(ctx, &serverWg, websocketServer, serverErrCh) }