Skip to content

Commit

Permalink
init code/directory structure
Browse files Browse the repository at this point in the history
  • Loading branch information
kakdeykaushik committed Dec 5, 2024
1 parent 66a9f33 commit d623913
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 13 deletions.
34 changes: 34 additions & 0 deletions internal/eval/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eval

import (
"sync"

"github.com/dicedb/dice/internal/global"
)

var (
clients = make([]global.IOThread, 0)
mu = sync.Mutex{}
)

func GetClients() []global.IOThread {
return clients
}

func AddClient(client global.IOThread) {
mu.Lock()
defer mu.Unlock()
clients = append(clients, client)
}

// if id is monotonic binary search can be used
func RemoveClientByID(id string) {
mu.Lock()
defer mu.Unlock()
for i, client := range clients {
if client.ID() == id {
clients = append(clients[:i], clients[i+1:]...)
return
}
}
}
18 changes: 16 additions & 2 deletions internal/eval/store_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -6261,8 +6261,22 @@ func evalKEYS(args []string, store *dstore.Store) *EvalResponse {
}

// TODO: Placeholder to support monitoring
func evalCLIENT(args []string, store *dstore.Store) *EvalResponse {
return makeEvalResult(clientio.OK)
func evalCLIENT(args []string, _ *dstore.Store) *EvalResponse {
if len(args) == 0 {
return makeEvalError(diceerrors.ErrWrongArgumentCount("CLIENT"))
}

subcommand := strings.ToUpper(args[0])
switch subcommand {
case List:
o := make([]string, 0, len(GetClients()))
for _, client := range GetClients() {
o = append(o, client.String())
}
return makeEvalResult(clientio.Encode(strings.Join(o, "\n"), false))
default:
return makeEvalError(diceerrors.ErrWrongArgumentCount("CLIENT"))
}
}

// TODO: Placeholder to support monitoring
Expand Down
10 changes: 10 additions & 0 deletions internal/global/iothread.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package global

import "context"

type IOThread interface {
ID() string
Start(context.Context) error
Stop() error
String() string
}
109 changes: 102 additions & 7 deletions internal/iothread/iothread.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"net"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
Expand All @@ -15,9 +16,12 @@ import (
"github.com/dicedb/dice/internal/auth"
"github.com/dicedb/dice/internal/clientio"
"github.com/dicedb/dice/internal/clientio/iohandler"
"github.com/dicedb/dice/internal/clientio/iohandler/netconn"
"github.com/dicedb/dice/internal/clientio/requestparser"
"github.com/dicedb/dice/internal/cmd"
diceerrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/eval"
"github.com/dicedb/dice/internal/global"
"github.com/dicedb/dice/internal/ops"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/shard"
Expand All @@ -31,15 +35,11 @@ const defaultRequestTimeout = 6 * time.Second
var requestCounter uint32

// IOThread interface
type IOThread interface {
ID() string
Start(context.Context) error
Stop() error
}

type BaseIOThread struct {
IOThread
global.IOThread
id string
Name string
ioHandler iohandler.IOHandler
parser requestparser.Parser
shardManager *shard.ShardManager
Expand All @@ -50,13 +50,14 @@ type BaseIOThread struct {
preprocessingChan chan *ops.StoreResponse
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription
wl wal.AbstractWAL
lastCmd *cmd.DiceDBCmd
}

func NewIOThread(wid string, responseChan, preprocessingChan chan *ops.StoreResponse,
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription,
ioHandler iohandler.IOHandler, parser requestparser.Parser,
shardManager *shard.ShardManager, gec chan error, wl wal.AbstractWAL) *BaseIOThread {
return &BaseIOThread{
thread := &BaseIOThread{
id: wid,
ioHandler: ioHandler,
parser: parser,
Expand All @@ -69,12 +70,105 @@ func NewIOThread(wid string, responseChan, preprocessingChan chan *ops.StoreResp
cmdWatchSubscriptionChan: cmdWatchSubscriptionChan,
wl: wl,
}
eval.AddClient(thread)
return thread
}

func (t *BaseIOThread) ID() string {
return t.id
}

func addr(fd int) (addr, laddr string, err error) {
// addr
sa, err := syscall.Getpeername(fd)
if err != nil {
return "", "", err
}
switch v := sa.(type) {
case *syscall.SockaddrInet4:
addr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
case *syscall.SockaddrInet6:
addr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
}

// laddr
sa, err = syscall.Getsockname(fd)
if err != nil {
return "", "", err
}
switch v := sa.(type) {
case *syscall.SockaddrInet4:
laddr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
case *syscall.SockaddrInet6:
laddr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
}

return addr, laddr, nil
}

func (t *BaseIOThread) String() string {
var s strings.Builder

// id
s.WriteString("id=")
s.WriteString(t.id)
s.WriteString(" ")

// addr and laddr
switch hndlr := t.ioHandler.(type) {

Check failure on line 118 in internal/iothread/iothread.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case *netconn.IOHandler:
addr, laddr, _ := addr(hndlr.FileDescriptor())
s.WriteString("addr=")
s.WriteString(addr)
s.WriteString(" ")

s.WriteString("laddr=")
s.WriteString(laddr)
s.WriteString(" ")

s.WriteString("fd=")
s.WriteString(strconv.FormatInt(int64(hndlr.FileDescriptor()), 10))
s.WriteString(" ")
}

// name
s.WriteString("name=")
s.WriteString(t.Name)
s.WriteString(" ")

// age
s.WriteString("age=")
s.WriteString(strconv.FormatFloat(time.Since(t.Session.CreatedAt).Seconds(), 'f', 0, 64))
s.WriteString(" ")

// idle
s.WriteString("idle=")
s.WriteString(strconv.FormatFloat(time.Since(t.Session.LastAccessedAt).Seconds(), 'f', 0, 64))
s.WriteString(" ")

// // flags

Check failure on line 149 in internal/iothread/iothread.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// s.WriteString("flags=")
// s.WriteString("")
// s.WriteString(" ")

// argv-mem

Check failure on line 154 in internal/iothread/iothread.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// s.WriteString("argv-mem=")
// s.WriteString(strconv.FormatInt(int64(c.ArgLenSum), 10))
// s.WriteString(" ")

// cmd
s.WriteString("cmd=")
// todo: handle `CLIENT ID` as "client|id" and `SET k 1` as "set"
if t.lastCmd == nil {
s.WriteString("NULL")
} else {
s.WriteString(strings.ToLower(t.lastCmd.Cmd))
}
s.WriteString(" ")

return s.String()
}

func (t *BaseIOThread) Start(ctx context.Context) error {
errChan := make(chan error, 1)
incomingDataChan := make(chan []byte)
Expand Down Expand Up @@ -603,6 +697,7 @@ func (t *BaseIOThread) isAuthenticated(diceDBCmd *cmd.DiceDBCmd) error {
func (t *BaseIOThread) Stop() error {
slog.Info("Stopping io-thread", slog.String("id", t.id))
t.Session.Expire()
eval.RemoveClientByID(t.id)
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions internal/iothread/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"sync/atomic"

"github.com/dicedb/dice/internal/global"
"github.com/dicedb/dice/internal/shard"
)

Expand All @@ -28,7 +29,7 @@ func NewManager(maxClients int32, sm *shard.ShardManager) *Manager {
}
}

func (m *Manager) RegisterIOThread(ioThread IOThread) error {
func (m *Manager) RegisterIOThread(ioThread global.IOThread) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -54,17 +55,17 @@ func (m *Manager) IOThreadCount() int32 {
return m.numIOThreads.Load()
}

func (m *Manager) GetIOThread(id string) (IOThread, bool) {
func (m *Manager) GetIOThread(id string) (global.IOThread, bool) {
client, ok := m.connectedClients.Load(id)
if !ok {
return nil, false
}
return client.(IOThread), true
return client.(global.IOThread), true
}

func (m *Manager) UnregisterIOThread(id string) error {
if client, loaded := m.connectedClients.LoadAndDelete(id); loaded {
w := client.(IOThread)
w := client.(global.IOThread)
if err := w.Stop(); err != nil {
return err
}
Expand Down

0 comments on commit d623913

Please sign in to comment.