Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLEANUP] remove sql and query_manager #1369

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
)
Expand Down Expand Up @@ -115,7 +114,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel)
queryWatcherLocal := querymanager.NewQueryManager()

config.DiceConfig.HTTP.Port = opt.Port
// Initialize the HTTPServer
testServer := httpws.NewHTTPServer(shardManager, nil)
Expand All @@ -128,12 +127,6 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
shardManager.Run(shardManagerCtx)
}()

wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// Start the server in a goroutine
wg.Add(1)
go func() {
Expand Down
9 changes: 0 additions & 9 deletions integration_tests/commands/websocket/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -116,7 +115,6 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
globalErrChannel := make(chan error)
watchChan := make(chan dstore.QueryWatchEvent, config.DiceConfig.Performance.WatchChanBufSize)
shardManager := shard.NewShardManager(1, watchChan, nil, globalErrChannel)
queryWatcherLocal := querymanager.NewQueryManager()
config.DiceConfig.WebSocket.Port = opt.Port
testServer := httpws.NewWebSocketServer(shardManager, testPort1, nil)
shardManagerCtx, cancelShardManager := context.WithCancel(ctx)
Expand All @@ -128,13 +126,6 @@ func RunWebsocketServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerO
shardManager.Run(shardManagerCtx)
}()

// run query manager
wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// start websocket server
wg.Add(1)
go func() {
Expand Down
11 changes: 0 additions & 11 deletions internal/clientio/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/dicedb/dice/internal/object"

"github.com/dicedb/dice/internal/sql"

"github.com/dicedb/dice/internal/server/utils"
dstore "github.com/dicedb/dice/internal/store"
)
Expand Down Expand Up @@ -282,15 +280,6 @@ func Encode(value interface{}, isSimple bool) []byte {
buf.Write(Encode(fmt.Sprintf("key:%s", we.Key), false))
buf.Write(Encode(fmt.Sprintf("op:%s", we.Operation), false))
return []byte(fmt.Sprintf("*2\r\n%s", buf.Bytes()))
case []sql.QueryResultRow:
var b []byte
buf := bytes.NewBuffer(b) // Create a buffer for accumulating encoded rows.
for _, row := range value.([]sql.QueryResultRow) {
buf.WriteString("*2\r\n") // Start a new array for each row.
buf.Write(Encode(row.Key, false)) // Encode the row key.
buf.Write(Encode(row.Value.Value, false)) // Encode the row value.
}
return []byte(fmt.Sprintf("*%d\r\n%s", len(v), buf.Bytes())) // Return the encoded response.

// Handle map[string]bool and return a nil response indicating unsupported types.
case map[string]bool:
Expand Down
92 changes: 0 additions & 92 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@ import (
"strconv"
"time"

"github.com/dicedb/dice/internal/object"

"github.com/dicedb/dice/internal/sql"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio"
"github.com/dicedb/dice/internal/comm"
diceerrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querymanager"
dstore "github.com/dicedb/dice/internal/store"
)

Expand Down Expand Up @@ -173,90 +168,3 @@ func evalSLEEP(args []string, store *dstore.Store) []byte {
time.Sleep(time.Duration(durationSec) * time.Second)
return clientio.RespOK
}

// EvalQWATCH adds the specified key to the watch list for the caller client.
// Every time a key in the watch list is modified, the client will be sent a response
// containing the new value of the key along with the operation that was performed on it.
// Contains only one argument, the query to be watched.
func EvalQWATCH(args []string, httpOp, websocketOp bool, client *comm.Client, store *dstore.Store) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("Q.WATCH")
}

// Parse and get the selection from the query.
query, e := sql.ParseQuery( /*sql=*/ args[0])

if e != nil {
return clientio.Encode(e, false)
}

// use an unbuffered channel to ensure that we only proceed to query execution once the query watcher has built the cache
cacheChannel := make(chan *[]struct {
Key string
Value *object.Obj
})
var watchSubscription querymanager.QuerySubscription

if httpOp || websocketOp {
watchSubscription = querymanager.QuerySubscription{
Subscribe: true,
Query: query,
CacheChan: cacheChannel,
QwatchClientChan: client.HTTPQwatchResponseChan,
ClientIdentifierID: client.ClientIdentifierID,
}
} else {
watchSubscription = querymanager.QuerySubscription{
Subscribe: true,
Query: query,
ClientFD: client.Fd,
CacheChan: cacheChannel,
}
}

querymanager.QuerySubscriptionChan <- watchSubscription
store.CacheKeysForQuery(query.Where, cacheChannel)

// Return the result of the query.
responseChan := make(chan querymanager.AdhocQueryResult)
querymanager.AdhocQueryChan <- querymanager.AdhocQuery{
Query: query,
ResponseChan: responseChan,
}

queryResult := <-responseChan
if queryResult.Err != nil {
return clientio.Encode(queryResult.Err, false)
}

// TODO: We should return the list of all queries being watched by the client.
return clientio.Encode(querymanager.GenericWatchResponse(sql.Qwatch, query.String(), *queryResult.Result), false)
}

// EvalQUNWATCH removes the specified key from the watch list for the caller client.
func EvalQUNWATCH(args []string, httpOp bool, client *comm.Client) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("Q.UNWATCH")
}
query, e := sql.ParseQuery( /*sql=*/ args[0])
if e != nil {
return clientio.Encode(e, false)
}

if httpOp {
querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{
Subscribe: false,
Query: query,
QwatchClientChan: client.HTTPQwatchResponseChan,
ClientIdentifierID: client.ClientIdentifierID,
}
} else {
querymanager.QuerySubscriptionChan <- querymanager.QuerySubscription{
Subscribe: false,
Query: query,
ClientFD: client.Fd,
}
}

return clientio.RespOK
}
4 changes: 0 additions & 4 deletions internal/eval/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ func (e *Eval) ExecuteCommand() *EvalResponse {
switch diceCmd.Name {
// Old implementation kept as it is, but we will be moving
// to the new implementation soon for all commands
case "SUBSCRIBE", "Q.WATCH":
return &EvalResponse{Result: EvalQWATCH(e.cmd.Args, e.isHTTPOperation, e.isWebSocketOperation, e.client, e.store), Error: nil}
case "UNSUBSCRIBE", "Q.UNWATCH":
return &EvalResponse{Result: EvalQUNWATCH(e.cmd.Args, e.isHTTPOperation, e.client), Error: nil}
case auth.Cmd:
return &EvalResponse{Result: EvalAUTH(e.cmd.Args, e.client), Error: nil}
case "ABORT":
Expand Down
Loading
Loading