diff --git a/config/constants.go b/config/constants.go index a40175e8..9cbafb4e 100644 --- a/config/constants.go +++ b/config/constants.go @@ -117,6 +117,7 @@ const ( DefaultTCPKeepAliveDuration = 3 * time.Second DefaultLoadBalancer = "roundrobin" DefaultTCPNoDelay = true + DefaultEngineStopTimeout = 5 * time.Second // Utility constants. DefaultSeed = 1000 diff --git a/config/getters.go b/config/getters.go index da5aff20..0d09ee35 100644 --- a/config/getters.go +++ b/config/getters.go @@ -5,7 +5,6 @@ import ( "path/filepath" "time" - "github.com/panjf2000/gnet/v2" "github.com/rs/zerolog" ) @@ -28,11 +27,6 @@ var ( "continue": Continue, "stop": Stop, } - loadBalancers = map[string]gnet.LoadBalancing{ - "roundrobin": gnet.RoundRobin, - "leastconnections": gnet.LeastConnections, - "sourceaddrhash": gnet.SourceAddrHash, - } logOutputs = map[string]LogOutput{ "console": Console, "stdout": Stdout, diff --git a/errors/errors.go b/errors/errors.go index d884d184..f16a6501 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -21,6 +21,10 @@ const ( ErrCodeClientSendFailed ErrCodeServerReceiveFailed ErrCodeServerSendFailed + ErrCodeServerListenFailed + ErrCodeSplitHostPortFailed + ErrCodeAcceptFailed + ErrCodeReadFailed ErrCodePutFailed ErrCodeCastFailed ErrCodeHookVerificationFailed @@ -77,6 +81,15 @@ var ( ErrCodeServerSendFailed, "couldn't send data to the client", nil) ErrServerReceiveFailed = NewGatewayDError( ErrCodeServerReceiveFailed, "couldn't receive data from the client", nil) + ErrServerListenFailed = NewGatewayDError( + ErrCodeServerListenFailed, "couldn't listen on the server", nil) + ErrSplitHostPortFailed = NewGatewayDError( + ErrCodeSplitHostPortFailed, "failed to split host:port", nil) + ErrAcceptFailed = NewGatewayDError( + ErrCodeAcceptFailed, "failed to accept connection", nil) + + ErrReadFailed = NewGatewayDError( + ErrCodeReadFailed, "failed to read from the client", nil) ErrPutFailed = NewGatewayDError( ErrCodePutFailed, "failed to put in pool", nil) diff --git a/go.mod b/go.mod index 1d614dc6..deba5272 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/invopop/jsonschema v0.8.0 github.com/knadh/koanf v1.5.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/panjf2000/gnet/v2 v2.3.2 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.44.0 @@ -70,16 +69,12 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect go.opentelemetry.io/otel/metric v1.18.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.13.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect - golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 83411919..23d47e47 100644 --- a/go.sum +++ b/go.sum @@ -274,10 +274,6 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= -github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= -github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= -github.com/panjf2000/gnet/v2 v2.3.2 h1:cwzq4S2fZbHvBaGriMlZTDtiFL/EzaaVny2V03wOuj0= -github.com/panjf2000/gnet/v2 v2.3.2/go.mod h1:jQ0+i/ZSs4wxUKl06sgjWE0bL/yWI1d5LkNdqulZXFc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= @@ -359,8 +355,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -391,11 +385,7 @@ go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0 go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= @@ -458,8 +448,6 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/network/client.go b/network/client.go index 3074b778..de2ef2a8 100644 --- a/network/client.go +++ b/network/client.go @@ -153,14 +153,14 @@ func (c *Client) Send(data []byte) (int, *gerr.GatewayDError) { break } - n, err := c.Conn.Write(data) + written, err := c.Conn.Write(data) if err != nil { c.logger.Error().Err(err).Msg("Couldn't send data to the server") span.RecordError(err) return 0, gerr.ErrClientSendFailed.Wrap(err) } - sent += n + sent += written } c.logger.Debug().Fields( diff --git a/network/engine.go b/network/engine.go index 89144ed2..f38ababb 100644 --- a/network/engine.go +++ b/network/engine.go @@ -5,6 +5,9 @@ import ( "net" "strconv" "time" + + "github.com/gatewayd-io/gatewayd/config" + gerr "github.com/gatewayd-io/gatewayd/errors" ) type Option struct { @@ -39,7 +42,7 @@ func (engine *Engine) CountConnections() int { } func (engine *Engine) Stop(ctx context.Context) error { - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(2*time.Second)) + _, cancel := context.WithDeadline(ctx, time.Now().Add(config.DefaultEngineStopTimeout)) defer cancel() engine.stopServer <- struct{}{} @@ -47,7 +50,7 @@ func (engine *Engine) Stop(ctx context.Context) error { } // Run starts a server and connects all the handlers. -func Run(network, address string, server *Server, opts Option) error { +func Run(network, address string, server *Server) *gerr.GatewayDError { server.engine = Engine{ connections: 0, stopServer: make(chan struct{}), @@ -57,11 +60,11 @@ func Run(network, address string, server *Server, opts Option) error { return nil } - if ln, err := net.Listen(network, address); err != nil { + var err error + server.engine.listener, err = net.Listen(network, address) + if err != nil { server.logger.Error().Err(err).Msg("Server failed to start listening") - return err - } else { - server.engine.listener = ln + return gerr.ErrServerListenFailed.Wrap(err) } defer server.engine.listener.Close() @@ -70,25 +73,22 @@ func Run(network, address string, server *Server, opts Option) error { return nil } - if host, port, err := net.SplitHostPort(server.engine.listener.Addr().String()); err != nil { + var port string + server.engine.host, port, err = net.SplitHostPort(server.engine.listener.Addr().String()) + if err != nil { server.logger.Error().Err(err).Msg("Failed to split host and port") - return err - } else { - server.engine.host = host - if server.engine.port, err = strconv.Atoi(port); err != nil { - server.logger.Error().Err(err).Msg("Failed to convert port to integer") - return err - } + return gerr.ErrSplitHostPortFailed.Wrap(err) + } + + if server.engine.port, err = strconv.Atoi(port); err != nil { + server.logger.Error().Err(err).Msg("Failed to convert port to integer") + return gerr.ErrCastFailed.Wrap(err) } go func(server *Server) { - for { - select { - case <-server.engine.stopServer: - server.OnShutdown(server.engine) - server.logger.Debug().Msg("Server stopped") - } - } + <-server.engine.stopServer + server.OnShutdown(server.engine) + server.logger.Debug().Msg("Server stopped") }(server) go func(server *Server) { @@ -113,11 +113,13 @@ func Run(network, address string, server *Server, opts Option) error { conn, err := server.engine.listener.Accept() if err != nil { server.logger.Error().Err(err).Msg("Failed to accept connection") - return err + return gerr.ErrAcceptFailed.Wrap(err) } if out, action := server.OnOpen(conn); action != None { - conn.Write(out) + if _, err := conn.Write(out); err != nil { + server.logger.Error().Err(err).Msg("Failed to write to connection") + } conn.Close() if action == Shutdown { server.OnShutdown(server.engine) @@ -136,14 +138,10 @@ func Run(network, address string, server *Server, opts Option) error { }(server, conn, stopConnection) go func(server *Server, conn net.Conn, stopConnection chan struct{}) { - for { - select { - case <-stopConnection: - server.engine.connections-- - if action := server.OnClose(conn, err); action == Close { - return - } - } + <-stopConnection + server.engine.connections-- + if action := server.OnClose(conn, err); action == Close { + return } }(server, conn, stopConnection) } diff --git a/network/proxy.go b/network/proxy.go index 943113df..04db6dd9 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -554,7 +554,7 @@ func (pr *Proxy) BusyConnections() []string { } // receiveTrafficFromClient is a function that waits to receive data from the client. -func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, error) { +func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayDError) { _, span := otel.Tracer(config.TracerName).Start(pr.ctx, "receiveTrafficFromClient") defer span.End() @@ -571,7 +571,7 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, error) { metrics.BytesReceivedFromClient.Observe(float64(read)) metrics.TotalTrafficBytes.Observe(float64(read)) - return chunk[:read], err + return chunk[:read], gerr.ErrReadFailed.Wrap(err) } received += read @@ -597,7 +597,6 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, error) { metrics.BytesReceivedFromClient.Observe(float64(length)) metrics.TotalTrafficBytes.Observe(float64(length)) - //nolint:wrapcheck return buffer.Bytes(), nil } @@ -668,14 +667,14 @@ func (pr *Proxy) sendTrafficToClient( break } - n, origErr := conn.Write(response[:received]) + written, origErr := conn.Write(response[:received]) if origErr != nil { pr.logger.Error().Err(origErr).Msg("Error writing to client") span.RecordError(origErr) return gerr.ErrServerSendFailed.Wrap(origErr) } - sent += n + sent += written } pr.logger.Debug().Fields( diff --git a/network/server.go b/network/server.go index 4ba918e9..ffa365e8 100644 --- a/network/server.go +++ b/network/server.go @@ -371,7 +371,7 @@ func (s *Server) Run() error { } // Start the server. - origErr := Run(s.Network, addr, s, s.Options) + origErr := Run(s.Network, addr, s) if origErr != nil { s.logger.Error().Err(origErr).Msg("Failed to start server") span.RecordError(origErr)