diff --git a/cmd/run.go b/cmd/run.go index 6c756882..17902c2b 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -354,8 +354,8 @@ var runCmd = &cobra.Command{ attribute.String("sendDeadline", client.SendDeadline.String()), attribute.Bool("tcpKeepAlive", client.TCPKeepAlive), attribute.String("tcpKeepAlivePeriod", client.TCPKeepAlivePeriod.String()), - attribute.String("localAddress", client.Conn.LocalAddr().String()), - attribute.String("remoteAddress", client.Conn.RemoteAddr().String()), + attribute.String("localAddress", client.LocalAddr()), + attribute.String("remoteAddress", client.RemoteAddr()), ) if client.ID != "" { eventOptions = trace.WithAttributes( diff --git a/network/client.go b/network/client.go index 5e2ccadf..da24906f 100644 --- a/network/client.go +++ b/network/client.go @@ -19,6 +19,8 @@ type IClient interface { Receive() (int, []byte, *gerr.GatewayDError) Close() IsConnected() bool + RemoteAddr() string + LocalAddr() string } type Client struct { @@ -245,3 +247,21 @@ func (c *Client) IsConnected() bool { return true } + +// RemoteAddr returns the remote address of the client safely. +func (c *Client) RemoteAddr() string { + if c.Conn != nil && c.Conn.RemoteAddr() != nil { + return c.Conn.RemoteAddr().String() + } + + return "" +} + +// LocalAddr returns the local address of the client safely. +func (c *Client) LocalAddr() string { + if c.Conn != nil && c.Conn.LocalAddr() != nil { + return c.Conn.LocalAddr().String() + } + + return "" +} diff --git a/network/proxy.go b/network/proxy.go index 437ad9e6..323d9257 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -173,7 +173,7 @@ func (pr *Proxy) Connect(gconn gnet.Conn) *gerr.GatewayDError { fields := map[string]interface{}{ "function": "proxy.connect", "client": "unknown", - "server": gconn.RemoteAddr().String(), + "server": RemoteAddr(gconn), } if client.ID != "" { fields["client"] = client.ID[:7] @@ -352,12 +352,14 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { // The connection to the server is closed, so we MUST reconnect, // otherwise the client will be stuck. + // TODO: Fix bug in handling connection close + // See: https://github.com/gatewayd-io/gatewayd/issues/219 if IsConnClosed(received, err) || IsConnTimedOut(err) { pr.logger.Debug().Fields( map[string]interface{}{ "function": "proxy.passthrough", - "local": client.Conn.LocalAddr().String(), - "remote": client.Conn.RemoteAddr().String(), + "local": client.LocalAddr(), + "remote": client.RemoteAddr(), }).Msg("Client disconnected") client.Close() @@ -371,12 +373,14 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { } // If the response is empty, don't send anything, instead just close the ingress connection. + // TODO: Fix bug in handling connection close + // See: https://github.com/gatewayd-io/gatewayd/issues/219 if received == 0 { pr.logger.Debug().Fields( map[string]interface{}{ "function": "proxy.passthrough", - "local": client.Conn.LocalAddr().String(), - "remote": client.Conn.RemoteAddr().String(), + "local": client.LocalAddr(), + "remote": client.RemoteAddr(), }).Msg("No data to send to client") span.AddEvent("No data to send to client") span.RecordError(err) @@ -533,7 +537,7 @@ func (pr *Proxy) BusyConnections() []string { connections := make([]string, 0) pr.busyConnections.ForEach(func(key, _ interface{}) bool { if gconn, ok := key.(gnet.Conn); ok { - connections = append(connections, gconn.RemoteAddr().String()) + connections = append(connections, RemoteAddr(gconn)) } return true }) @@ -554,8 +558,8 @@ func (pr *Proxy) receiveTrafficFromClient(gconn gnet.Conn) ([]byte, error) { pr.logger.Debug().Fields( map[string]interface{}{ "length": len(request), - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, ).Msg("Received data from client") @@ -581,8 +585,8 @@ func (pr *Proxy) sendTrafficToServer(client *Client, request []byte) (int, *gerr map[string]interface{}{ "function": "proxy.passthrough", "length": sent, - "local": client.Conn.LocalAddr().String(), - "remote": client.Conn.RemoteAddr().String(), + "local": client.LocalAddr(), + "remote": client.RemoteAddr(), }, ).Msg("Sent data to database") @@ -603,8 +607,8 @@ func (pr *Proxy) receiveTrafficFromServer(client *Client) (int, []byte, *gerr.Ga map[string]interface{}{ "function": "proxy.passthrough", "length": received, - "local": client.Conn.LocalAddr().String(), - "remote": client.Conn.RemoteAddr().String(), + "local": client.LocalAddr(), + "remote": client.RemoteAddr(), }, ).Msg("Received data from database") @@ -627,8 +631,8 @@ func (pr *Proxy) sendTrafficToClient( map[string]interface{}{ "function": "proxy.passthrough", "length": received, - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, ).Msg("Sent data to client") span.RecordError(err) diff --git a/network/server.go b/network/server.go index af4fb683..bfa7a48d 100644 --- a/network/server.go +++ b/network/server.go @@ -86,7 +86,7 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) { _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnOpen") defer span.End() - s.logger.Debug().Str("from", gconn.RemoteAddr().String()).Msg( + s.logger.Debug().Str("from", RemoteAddr(gconn)).Msg( "GatewayD is opening a connection") pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.pluginTimeout) @@ -94,8 +94,8 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) { // Run the OnOpening hooks. onOpeningData := map[string]interface{}{ "client": map[string]interface{}{ - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, } _, err := s.pluginRegistry.Run( @@ -141,8 +141,8 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) { // Run the OnOpened hooks. onOpenedData := map[string]interface{}{ "client": map[string]interface{}{ - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, } _, err = s.pluginRegistry.Run( @@ -165,7 +165,7 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action { _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnClose") defer span.End() - s.logger.Debug().Str("from", gconn.RemoteAddr().String()).Msg( + s.logger.Debug().Str("from", RemoteAddr(gconn)).Msg( "GatewayD is closing a connection") pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.pluginTimeout) @@ -173,8 +173,8 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action { // Run the OnClosing hooks. data := map[string]interface{}{ "client": map[string]interface{}{ - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, "error": "", } @@ -208,8 +208,8 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action { // Run the OnClosed hooks. data = map[string]interface{}{ "client": map[string]interface{}{ - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, "error": "", } @@ -240,8 +240,8 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { // Run the OnTraffic hooks. onTrafficData := map[string]interface{}{ "client": map[string]interface{}{ - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, } _, err := s.pluginRegistry.Run( @@ -266,6 +266,8 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { errors.Is(err, gerr.ErrClientReceiveFailed), errors.Is(err, gerr.ErrHookTerminatedConnection), errors.Is(err.Unwrap(), io.EOF): + // TODO: Fix bug in handling connection close + // See: https://github.com/gatewayd-io/gatewayd/issues/219 return gnet.Close } } diff --git a/network/utils.go b/network/utils.go index b1ee7951..0d4a3dac 100644 --- a/network/utils.go +++ b/network/utils.go @@ -58,14 +58,18 @@ func trafficData( fields []Field, err interface{}, ) map[string]interface{} { + if gconn == nil || client == nil { + return nil + } + data := map[string]interface{}{ "client": map[string]interface{}{ - "local": gconn.LocalAddr().String(), - "remote": gconn.RemoteAddr().String(), + "local": LocalAddr(gconn), + "remote": RemoteAddr(gconn), }, "server": map[string]interface{}{ - "local": client.Conn.LocalAddr().String(), - "remote": client.Conn.RemoteAddr().String(), + "local": client.LocalAddr(), + "remote": client.RemoteAddr(), }, "error": "", } @@ -127,3 +131,19 @@ func IsConnTimedOut(err *gerr.GatewayDError) bool { func IsConnClosed(received int, err *gerr.GatewayDError) bool { return received == 0 && err != nil && err.Unwrap() != nil && errors.Is(err.Unwrap(), io.EOF) } + +// LocalAddr returns the local address of the connection. +func LocalAddr(gconn gnet.Conn) string { + if gconn != nil && gconn.LocalAddr() != nil { + return gconn.LocalAddr().String() + } + return "" +} + +// RemoteAddr returns the remote address of the connection. +func RemoteAddr(gconn gnet.Conn) string { + if gconn != nil && gconn.RemoteAddr() != nil { + return gconn.RemoteAddr().String() + } + return "" +}