Skip to content

Commit

Permalink
Merge pull request #344 from gatewayd-io/refactor-server-and-proxy
Browse files Browse the repository at this point in the history
Refactor server and proxy and remove `gnet/v2`
  • Loading branch information
mostafa authored Oct 17, 2023
2 parents 38bbb86 + fc41aed commit 2ef135f
Show file tree
Hide file tree
Showing 29 changed files with 1,190 additions and 637 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
name: Test GatewayD Plugins
runs-on: ubuntu-latest
needs: test
timeout-minutes: 3
services:
postgres:
image: postgres
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dist/

# Editor files
.vscode
.idea

# Logs
*.log
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ clean:
test:
@go test -v ./...

test-race:
@go test -race -v ./...

benchmark:
@go test -bench=. -benchmem -run=^# ./...

Expand Down
78 changes: 35 additions & 43 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
usage "github.com/gatewayd-io/gatewayd/usagereport/v1"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron"
"github.com/panjf2000/gnet/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -71,7 +70,6 @@ var (

func StopGracefully(
runCtx context.Context,
pluginTimeoutCtx context.Context,
sig os.Signal,
metricsMerger *metrics.Merger,
metricsServer *http.Server,
Expand All @@ -88,6 +86,10 @@ func StopGracefully(

logger.Info().Msg("Notifying the plugins that the server is shutting down")
if pluginRegistry != nil {
pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), conf.Plugin.Timeout)
defer cancel()

//nolint:contextcheck
_, err := pluginRegistry.Run(
pluginTimeoutCtx,
map[string]interface{}{"signal": signal},
Expand All @@ -99,11 +101,12 @@ func StopGracefully(
}
}

logger.Info().Msg("Stopping GatewayD")
span.AddEvent("Stopping GatewayD", trace.WithAttributes(
logger.Info().Msg("GatewayD is shutting down")
span.AddEvent("GatewayD is shutting down", trace.WithAttributes(
attribute.String("signal", signal),
))
if healthCheckScheduler != nil {
healthCheckScheduler.Stop()
healthCheckScheduler.Clear()
logger.Info().Msg("Stopped health check scheduler")
span.AddEvent("Stopped health check scheduler")
Expand Down Expand Up @@ -269,7 +272,7 @@ var runCmd = &cobra.Command{
startDelay := time.Now().Add(conf.Plugin.HealthCheckPeriod)
if _, err := healthCheckScheduler.Every(
conf.Plugin.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do(func() {
_, span = otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
_, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
defer span.End()

var plugins []string
Expand Down Expand Up @@ -461,6 +464,9 @@ var runCmd = &cobra.Command{
}(conf.Global.Metrics[config.Default], logger)

// This is a notification hook, so we don't care about the result.
pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), conf.Plugin.Timeout)
defer cancel()

if data, ok := conf.GlobalKoanf.Get("loggers").(map[string]interface{}); ok {
_, err = pluginRegistry.Run(
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_LOGGER)
Expand Down Expand Up @@ -527,6 +533,10 @@ var runCmd = &cobra.Command{

span.AddEvent("Create client", eventOptions)

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

clientCfg := map[string]interface{}{
"id": client.ID,
"network": client.Network,
Expand Down Expand Up @@ -571,6 +581,10 @@ var runCmd = &cobra.Command{
os.Exit(gerr.FailedToInitializePool)
}

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

_, err = pluginRegistry.Run(
pluginTimeoutCtx,
map[string]interface{}{"name": name, "size": cfg.GetSize()},
Expand Down Expand Up @@ -610,6 +624,10 @@ var runCmd = &cobra.Command{
attribute.String("healthCheckPeriod", cfg.HealthCheckPeriod.String()),
))

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

if data, ok := conf.GlobalKoanf.Get("proxies").(map[string]interface{}); ok {
_, err = pluginRegistry.Run(
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_PROXY)
Expand All @@ -633,30 +651,9 @@ var runCmd = &cobra.Command{
cfg.Network,
cfg.Address,
cfg.GetTickInterval(),
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(cfg.MultiCore),
gnet.WithLockOSThread(cfg.LockOSThread),
// NumEventLoop overrides Multicore option.
// gnet.WithNumEventLoop(1),

network.Option{
// Can be used to send keepalive messages to the client.
gnet.WithTicker(cfg.EnableTicker),

// Internal event-loop load balancing options
gnet.WithLoadBalancing(cfg.GetLoadBalancer()),

// Buffer options
gnet.WithReadBufferCap(cfg.ReadBufferCap),
gnet.WithWriteBufferCap(cfg.WriteBufferCap),
gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer),
gnet.WithSocketSendBuffer(cfg.SocketSendBuffer),

// TCP options
gnet.WithReuseAddr(cfg.ReuseAddress),
gnet.WithReusePort(cfg.ReusePort),
gnet.WithTCPKeepAlive(cfg.TCPKeepAlive),
gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()),
EnableTicker: cfg.EnableTicker,
},
proxies[name],
logger,
Expand All @@ -669,21 +666,13 @@ var runCmd = &cobra.Command{
attribute.String("network", cfg.Network),
attribute.String("address", cfg.Address),
attribute.String("tickInterval", cfg.TickInterval.String()),
attribute.Bool("multiCore", cfg.MultiCore),
attribute.Bool("lockOSThread", cfg.LockOSThread),
attribute.Bool("enableTicker", cfg.EnableTicker),
attribute.String("loadBalancer", cfg.LoadBalancer),
attribute.Int("readBufferCap", cfg.ReadBufferCap),
attribute.Int("writeBufferCap", cfg.WriteBufferCap),
attribute.Int("socketRecvBuffer", cfg.SocketRecvBuffer),
attribute.Int("socketSendBuffer", cfg.SocketSendBuffer),
attribute.Bool("reuseAddress", cfg.ReuseAddress),
attribute.Bool("reusePort", cfg.ReusePort),
attribute.String("tcpKeepAlive", cfg.TCPKeepAlive.String()),
attribute.Bool("tcpNoDelay", cfg.TCPNoDelay),
attribute.String("pluginTimeout", conf.Plugin.Timeout.String()),
))

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok {
_, err = pluginRegistry.Run(
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_SERVER)
Expand Down Expand Up @@ -786,13 +775,15 @@ var runCmd = &cobra.Command{
go func(pluginRegistry *plugin.Registry,
logger zerolog.Logger,
servers map[string]*network.Server,
metricsMerger *metrics.Merger,
metricsServer *http.Server,
stopChan chan struct{},
) {
for sig := range signalsCh {
for _, s := range signals {
if sig != s {
StopGracefully(
runCtx,
pluginTimeoutCtx,
sig,
metricsMerger,
metricsServer,
Expand All @@ -805,13 +796,14 @@ var runCmd = &cobra.Command{
}
}
}
}(pluginRegistry, logger, servers)
}(pluginRegistry, logger, servers, metricsMerger, metricsServer, stopChan)

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Start servers")
// Start the server.
for name, server := range servers {
logger := loggers[name]
go func(
span trace.Span,
server *network.Server,
logger zerolog.Logger,
healthCheckScheduler *gocron.Scheduler,
Expand All @@ -831,7 +823,7 @@ var runCmd = &cobra.Command{
pluginRegistry.Shutdown()
os.Exit(gerr.FailedToStartServer)
}
}(server, logger, healthCheckScheduler, metricsMerger, pluginRegistry)
}(span, server, logger, healthCheckScheduler, metricsMerger, pluginRegistry)
}
span.End()

Expand Down
12 changes: 5 additions & 7 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -29,8 +30,7 @@ func Test_runCmd(t *testing.T) {
time.Sleep(100 * time.Millisecond)

StopGracefully(
runCmd.Context(),
runCmd.Context(),
context.Background(),
nil,
nil,
nil,
Expand Down Expand Up @@ -88,8 +88,7 @@ func Test_runCmdWithMultiTenancy(t *testing.T) {
time.Sleep(500 * time.Millisecond)

StopGracefully(
runCmd.Context(),
runCmd.Context(),
context.Background(),
nil,
nil,
nil,
Expand Down Expand Up @@ -165,11 +164,10 @@ func Test_runCmdWithCachePlugin(t *testing.T) {
var waitGroup sync.WaitGroup
waitGroup.Add(1)
go func(waitGroup *sync.WaitGroup) {
time.Sleep(500 * time.Millisecond)
time.Sleep(time.Second)

StopGracefully(
runCmd.Context(),
runCmd.Context(),
context.Background(),
nil,
nil,
nil,
Expand Down
19 changes: 4 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,10 @@ func (c *Config) LoadDefaults(ctx context.Context) {
}

defaultServer := Server{
Network: DefaultListenNetwork,
Address: DefaultListenAddress,
EnableTicker: false,
TickInterval: DefaultTickInterval,
MultiCore: true,
LockOSThread: false,
ReuseAddress: true,
ReusePort: true,
LoadBalancer: DefaultLoadBalancer,
ReadBufferCap: DefaultBufferSize,
WriteBufferCap: DefaultBufferSize,
SocketRecvBuffer: DefaultBufferSize,
SocketSendBuffer: DefaultBufferSize,
TCPKeepAlive: DefaultTCPKeepAliveDuration,
TCPNoDelay: DefaultTCPNoDelay,
Network: DefaultListenNetwork,
Address: DefaultListenAddress,
EnableTicker: false,
TickInterval: DefaultTickInterval,
}

c.globalDefaults = GlobalConfig{
Expand Down
1 change: 1 addition & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ const (
DefaultTCPKeepAliveDuration = 3 * time.Second
DefaultLoadBalancer = "roundrobin"
DefaultTCPNoDelay = true
DefaultEngineStopTimeout = 5 * time.Second

// Utility constants.
DefaultSeed = 1000
Expand Down
23 changes: 0 additions & 23 deletions config/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"path/filepath"
"time"

"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
)

Expand All @@ -28,11 +27,6 @@ var (
"continue": Continue,
"stop": Stop,
}
loadBalancers = map[string]gnet.LoadBalancing{
"roundrobin": gnet.RoundRobin,
"leastconnections": gnet.LeastConnections,
"sourceaddrhash": gnet.SourceAddrHash,
}
logOutputs = map[string]LogOutput{
"console": Console,
"stdout": Stdout,
Expand Down Expand Up @@ -166,23 +160,6 @@ func (s Server) GetTickInterval() time.Duration {
return s.TickInterval
}

// GetLoadBalancer returns the load balancing algorithm to use.
func (s Server) GetLoadBalancer() gnet.LoadBalancing {
if lb, ok := loadBalancers[s.LoadBalancer]; ok {
return lb
}
return gnet.RoundRobin
}

// GetTCPNoDelay returns the TCP no delay option from config file.
func (s Server) GetTCPNoDelay() gnet.TCPSocketOpt {
if s.TCPNoDelay {
return gnet.TCPNoDelay
}

return gnet.TCPDelay
}

// GetSize returns the pool size from config file.
func (p Pool) GetSize() int {
if p.Size == 0 {
Expand Down
13 changes: 0 additions & 13 deletions config/getters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -75,18 +74,6 @@ func TestGetTickInterval(t *testing.T) {
assert.Equal(t, DefaultTickInterval, server.GetTickInterval())
}

// TestGetLoadBalancer tests the GetLoadBalancer function.
func TestGetLoadBalancer(t *testing.T) {
server := Server{}
assert.Equal(t, gnet.RoundRobin, server.GetLoadBalancer())
}

// TestGetTCPNoDelay tests the GetTCPNoDelay function.
func TestGetTCPNoDelay(t *testing.T) {
server := Server{}
assert.Equal(t, gnet.TCPDelay, server.GetTCPNoDelay())
}

// TestGetSize tests the GetSize function.
func TestGetSize(t *testing.T) {
pool := Pool{}
Expand Down
Loading

0 comments on commit 2ef135f

Please sign in to comment.