Skip to content

Commit

Permalink
WIP: refactor server and proxy and remove gnet/v2
Browse files Browse the repository at this point in the history
  • Loading branch information
mostafa committed Oct 7, 2023
1 parent 38bbb86 commit 186a038
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 189 deletions.
49 changes: 24 additions & 25 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
usage "github.com/gatewayd-io/gatewayd/usagereport/v1"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron"
"github.com/panjf2000/gnet/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -633,30 +632,30 @@ var runCmd = &cobra.Command{
cfg.Network,
cfg.Address,
cfg.GetTickInterval(),
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(cfg.MultiCore),
gnet.WithLockOSThread(cfg.LockOSThread),
// NumEventLoop overrides Multicore option.
// gnet.WithNumEventLoop(1),

// Can be used to send keepalive messages to the client.
gnet.WithTicker(cfg.EnableTicker),

// Internal event-loop load balancing options
gnet.WithLoadBalancing(cfg.GetLoadBalancer()),

// Buffer options
gnet.WithReadBufferCap(cfg.ReadBufferCap),
gnet.WithWriteBufferCap(cfg.WriteBufferCap),
gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer),
gnet.WithSocketSendBuffer(cfg.SocketSendBuffer),

// TCP options
gnet.WithReuseAddr(cfg.ReuseAddress),
gnet.WithReusePort(cfg.ReusePort),
gnet.WithTCPKeepAlive(cfg.TCPKeepAlive),
gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()),
[]network.Option{
// // Scheduling options
// gnet.WithMulticore(cfg.MultiCore),
// gnet.WithLockOSThread(cfg.LockOSThread),
// // NumEventLoop overrides Multicore option.
// // gnet.WithNumEventLoop(1),

// // Can be used to send keepalive messages to the client.
// gnet.WithTicker(cfg.EnableTicker),

// // Internal event-loop load balancing options
// gnet.WithLoadBalancing(cfg.GetLoadBalancer()),

// // Buffer options
// gnet.WithReadBufferCap(cfg.ReadBufferCap),
// gnet.WithWriteBufferCap(cfg.WriteBufferCap),
// gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer),
// gnet.WithSocketSendBuffer(cfg.SocketSendBuffer),

// // TCP options
// gnet.WithReuseAddr(cfg.ReuseAddress),
// gnet.WithReusePort(cfg.ReusePort),
// gnet.WithTCPKeepAlive(cfg.TCPKeepAlive),
// gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()),
},
proxies[name],
logger,
Expand Down
2 changes: 1 addition & 1 deletion gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
loggers:
default:
output: ["console"] # "stdout", "stderr", "syslog", "rsyslog" and "file"
level: "info" # panic, fatal, error, warn, info (default), debug, trace
level: "debug" # panic, fatal, error, warn, info (default), debug, trace
noColor: False
timeFormat: "unix" # unixms, unixmicro and unixnano
consoleTimeFormat: "RFC3339" # Go time format string
Expand Down
2 changes: 1 addition & 1 deletion gatewayd_plugins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ timeout: 30s
# and should only be used if one only has a single database in their PostgreSQL instance.
plugins:
- name: gatewayd-plugin-cache
enabled: True
enabled: False
localPath: ../gatewayd-plugin-cache/gatewayd-plugin-cache
args: ["--log-level", "debug"]
env:
Expand Down
21 changes: 16 additions & 5 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,23 @@ func (c *Client) Send(data []byte) (int, *gerr.GatewayDError) {
_, span := otel.Tracer(config.TracerName).Start(c.ctx, "Send")
defer span.End()

sent, err := c.Conn.Write(data)
if err != nil {
c.logger.Error().Err(err).Msg("Couldn't send data to the server")
span.RecordError(err)
return 0, gerr.ErrClientSendFailed.Wrap(err)
sent := 0
received := len(data)
for {
if sent >= received {
break
}

n, err := c.Conn.Write(data)
if err != nil {
c.logger.Error().Err(err).Msg("Couldn't send data to the server")
span.RecordError(err)
return 0, gerr.ErrClientSendFailed.Wrap(err)
}

sent += n
}

c.logger.Debug().Fields(
map[string]interface{}{
"length": sent,
Expand Down
176 changes: 176 additions & 0 deletions network/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package network

import (
"context"
"net"
"strconv"
"time"

"github.com/rs/zerolog"
)

type Option struct {
Multicore bool
NumEventLoop int
// LB LoadBalancing
ReuseAddr bool
ReusePort bool
MulticastInterfaceIndex int
ReadBufferCap int
WriteBufferCap int
LockOSThread bool
Ticker bool
TCPKeepAlive time.Duration
TCPNoDelay TCPSocketOpt
SocketRecvBuffer int
SocketSendBuffer int
LogPath string
LogLevel zerolog.Level
Logger zerolog.Logger
}

type Action int

const (
None Action = iota
Close
Shutdown
)

type TCPSocketOpt int

const (
TCPNoDelay TCPSocketOpt = iota
TCPDelay
)

type Engine struct {
listener net.Listener
host string
port int
connections map[string]*net.Conn
bufferSize int
handler EventHandler
stopChannel chan struct{}
}

func (engine *Engine) CountConnections() int {
return len(engine.connections)
}

func (engine *Engine) Stop(ctx context.Context) error {
engine.stopChannel <- struct{}{}
return nil
}

type (
EventHandler interface {
OnBoot(eng Engine) (action Action)
OnShutdown(eng Engine)
OnOpen(c net.Conn) (out []byte, action Action)
OnClose(c net.Conn, err error) (action Action)
OnTraffic(c net.Conn) (action Action)
OnTick() (delay time.Duration, action Action)
}

BuiltinEventEngine struct{}
)

// Create a new TCP server.
func Run(network, address string, server *Server, opts ...Option) error {
engine := Engine{
connections: make(map[string]*net.Conn),
stopChannel: make(chan struct{}),
}

if action := server.OnBoot(engine); action != None {
return nil
}
server.logger.Debug().Msg("Server booted")

if ln, err := net.Listen(network, address); err != nil {
server.logger.Error().Err(err).Msg("Failed to listen")
} else {
engine.listener = ln
}
defer engine.listener.Close()
server.logger.Debug().Str("address", engine.listener.Addr().String()).Msg("Server listening")

if engine.listener == nil {
server.logger.Error().Msg("Listener is nil")
return nil
}
server.logger.Debug().Msg("Server started")

if host, port, err := net.SplitHostPort(engine.listener.Addr().String()); err != nil {
server.logger.Error().Err(err).Msg("Failed to split host and port")
return err
} else {
engine.host = host
if engine.port, err = strconv.Atoi(port); err != nil {
server.logger.Error().Err(err).Msg("Failed to convert port to integer")
return err
}
}

for {
// if <-engine.stopChannel == struct{}{} {
// server.logger.Debug().Msg("Server stopped")
// break
// }
server.logger.Debug().Msg("Server tick")

conn, err := engine.listener.Accept()
if err != nil {
server.logger.Error().Err(err).Msg("Failed to accept connection")
return err
}

server.logger.Debug().Str("address", conn.RemoteAddr().String()).Msg("Connection accepted")

if out, action := server.OnOpen(conn); action != None {
conn.Write(out)
conn.Close()
if action == Shutdown {
server.logger.Debug().Str("address", conn.RemoteAddr().String()).Msg(
"Connection closed")
return nil
}
}
server.logger.Debug().Str("address", conn.RemoteAddr().String()).Msg("Connection accepted")

// engine.connections[conn.RemoteAddr().String()] = &conn
go func(server *Server, conn net.Conn) {
for {
// if n, err := conn.Read([]byte{}); n == 0 && err != nil {
// return
// }
// if action := server.OnTraffic(conn); action == Close {
// if action := server.OnClose(conn, err); action == Close {
// conn.Close()
// break
// }
// }
if action := server.OnTraffic(conn); action == Close {
if action := server.OnClose(conn, err); action == Close {
// FIXME: this should be handled by the server
server.logger.Debug().Str("address", conn.RemoteAddr().String()).Msg(
"Connection closed")
conn.Close()
return
}
}
time.Sleep(100 * time.Millisecond)
}
}(server, conn)

// defer delete(engine.connections, conn.RemoteAddr().String())

// if duration, action := server.OnTick(); action == Shutdown {
// return nil
// } else if duration > 0 {
// time.Sleep(duration)
// }
}
// engine.handler.OnShutdown(engine)
}
Loading

0 comments on commit 186a038

Please sign in to comment.