diff --git a/cmd/run.go b/cmd/run.go index 31bfe75b..d18d4d23 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -696,6 +696,8 @@ var runCmd = &cobra.Command{ // Add clients to the pool. for range currentPoolSize { clientConfig := clients[configGroupName][configBlockName] + clientConfig.GroupName = configGroupName + clientConfig.BlockName = configBlockName client := network.NewClient( runCtx, clientConfig, logger, network.NewRetry( @@ -716,6 +718,7 @@ var runCmd = &cobra.Command{ if client != nil { eventOptions := trace.WithAttributes( attribute.String("name", configBlockName), + attribute.String("group", configGroupName), attribute.String("network", client.Network), attribute.String("address", client.Address), attribute.Int("receiveChunkSize", client.ReceiveChunkSize), @@ -746,6 +749,8 @@ var runCmd = &cobra.Command{ clientCfg := map[string]interface{}{ "id": client.ID, + "name": configBlockName, + "group": configGroupName, "network": client.Network, "address": client.Address, "receiveChunkSize": client.ReceiveChunkSize, @@ -851,7 +856,8 @@ var runCmd = &cobra.Command{ proxies[configGroupName][configBlockName] = network.NewProxy( runCtx, network.Proxy{ - Name: configBlockName, + GroupName: configGroupName, + BlockName: configBlockName, AvailableConnections: pools[configGroupName][configBlockName], PluginRegistry: pluginRegistry, HealthCheckPeriod: cfg.HealthCheckPeriod, @@ -899,8 +905,9 @@ var runCmd = &cobra.Command{ servers[name] = network.NewServer( runCtx, network.Server{ - Network: cfg.Network, - Address: cfg.Address, + GroupName: name, + Network: cfg.Network, + Address: cfg.Address, TickInterval: config.If( cfg.TickInterval > 0, cfg.TickInterval, diff --git a/config/types.go b/config/types.go index 797bdf96..d065fc57 100644 --- a/config/types.go +++ b/config/types.go @@ -44,6 +44,9 @@ type ActionRedisConfig struct { } type Client struct { + BlockName string `json:"-"` + GroupName string `json:"-"` + Network string `json:"network" jsonschema:"enum=tcp,enum=udp,enum=unix" yaml:"network"` Address string `json:"address" yaml:"address"` TCPKeepAlive bool `json:"tcpKeepAlive" yaml:"tcpKeepAlive"` diff --git a/gatewayd.yaml b/gatewayd.yaml index fbb8452e..d1b05072 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -85,7 +85,7 @@ servers: # Load balancer strategies can be found in config/constants.go strategy: ROUND_ROBIN # ROUND_ROBIN, RANDOM, WEIGHTED_ROUND_ROBIN consistentHash: - useSourceIp: true + useSourceIp: true # Set to false for using the RANDOM strategy # Optional configuration for strategies that support rules (e.g., WEIGHTED_ROUND_ROBIN) # loadBalancingRules: # - condition: "DEFAULT" # Currently, only the "DEFAULT" condition is supported diff --git a/metrics/builtins.go b/metrics/builtins.go index 85f488ff..0e48bdae 100644 --- a/metrics/builtins.go +++ b/metrics/builtins.go @@ -10,51 +10,51 @@ const ( ) var ( - ClientConnections = promauto.NewGauge(prometheus.GaugeOpts{ + ClientConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "client_connections", Help: "Number of client connections", - }) - ServerConnections = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{"group", "block"}) + ServerConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "server_connections", Help: "Number of server connections", - }) - TLSConnections = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{"group", "block"}) + TLSConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "tls_connections", Help: "Number of TLS connections", - }) + }, []string{"group", "block"}) ServerTicksFired = promauto.NewCounter(prometheus.CounterOpts{ Namespace: Namespace, Name: "server_ticks_fired_total", Help: "Total number of server ticks fired", }) - BytesReceivedFromClient = promauto.NewSummary(prometheus.SummaryOpts{ + BytesReceivedFromClient = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_received_from_client", Help: "Number of bytes received from client", - }) - BytesSentToServer = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + BytesSentToServer = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_sent_to_server", Help: "Number of bytes sent to server", - }) - BytesReceivedFromServer = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + BytesReceivedFromServer = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_received_from_server", Help: "Number of bytes received from server", - }) - BytesSentToClient = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + BytesSentToClient = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_sent_to_client", Help: "Number of bytes sent to client", - }) - TotalTrafficBytes = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + TotalTrafficBytes = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "traffic_bytes", Help: "Number of total bytes passed through GatewayD via client or server", - }) + }, []string{"group", "block"}) PluginsLoaded = promauto.NewCounter(prometheus.CounterOpts{ Namespace: Namespace, Name: "plugins_loaded_total", @@ -70,31 +70,31 @@ var ( Name: "plugin_hooks_executed_total", Help: "Number of plugin hooks executed", }) - ProxyHealthChecks = promauto.NewCounter(prometheus.CounterOpts{ + ProxyHealthChecks = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_health_checks_total", Help: "Number of proxy health checks", - }) - ProxiedConnections = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{"group", "block"}) + ProxiedConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "proxied_connections", Help: "Number of proxy connects", - }) - ProxyPassThroughsToClient = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"group", "block"}) + ProxyPassThroughsToClient = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_passthroughs_to_client_total", Help: "Number of successful proxy passthroughs from server to client", - }) - ProxyPassThroughsToServer = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"group", "block"}) + ProxyPassThroughsToServer = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_passthroughs_to_server_total", Help: "Number of successful proxy passthroughs from client to server", - }) - ProxyPassThroughTerminations = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"group", "block"}) + ProxyPassThroughTerminations = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_passthrough_terminations_total", Help: "Number of proxy passthrough terminations by plugins", - }) + }, []string{"group", "block"}) APIRequests = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "api_requests_total", diff --git a/network/client.go b/network/client.go index 3d859599..6d8a2537 100644 --- a/network/client.go +++ b/network/client.go @@ -35,6 +35,9 @@ type Client struct { mu sync.Mutex retry IRetry + GroupName string + BlockName string + TCPKeepAlive bool TCPKeepAlivePeriod time.Duration ReceiveChunkSize int @@ -168,7 +171,7 @@ func NewClient( logger, ) - metrics.ServerConnections.Inc() + metrics.ServerConnections.WithLabelValues(clientConfig.GroupName, clientConfig.BlockName).Inc() return &client } @@ -267,7 +270,7 @@ func (c *Client) Reconnect() error { if c.conn != nil { c.Close() } else { - metrics.ServerConnections.Dec() + metrics.ServerConnections.WithLabelValues(c.GroupName, c.BlockName).Dec() } c.connected.Store(false) @@ -306,7 +309,7 @@ func (c *Client) Reconnect() error { ) c.connected.Store(true) c.logger.Debug().Str("address", c.Address).Msg("Reconnected to server") - metrics.ServerConnections.Inc() + metrics.ServerConnections.WithLabelValues(c.GroupName, c.BlockName).Inc() span.AddEvent("Reconnected to server") return nil @@ -343,7 +346,7 @@ func (c *Client) Close() { c.Address = "" c.Network = "" - metrics.ServerConnections.Dec() + metrics.ServerConnections.WithLabelValues(c.GroupName, c.BlockName).Dec() span.AddEvent("Closed connection to server") } diff --git a/network/network_helpers_test.go b/network/network_helpers_test.go index 66682ce0..2ab4d250 100644 --- a/network/network_helpers_test.go +++ b/network/network_helpers_test.go @@ -290,11 +290,15 @@ func (m MockProxy) BusyConnectionsString() []string { return nil } -// GetName returns the name of the MockProxy. -func (m MockProxy) GetName() string { +// GetBlockName returns the name of the MockProxy. +func (m MockProxy) GetBlockName() string { return m.name } +func (m MockProxy) GetGroupName() string { + return "default" +} + // Mock implementation of IConnWrapper. type MockConnWrapper struct { mock.Mock diff --git a/network/proxy.go b/network/proxy.go index b0a0ddaf..aa9971c7 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -26,6 +26,7 @@ import ( "golang.org/x/exp/maps" ) +//nolint:interfacebloat type IProxy interface { Connect(conn *ConnWrapper) *gerr.GatewayDError Disconnect(conn *ConnWrapper) *gerr.GatewayDError @@ -36,11 +37,13 @@ type IProxy interface { Shutdown() AvailableConnectionsString() []string BusyConnectionsString() []string - GetName() string + GetGroupName() string + GetBlockName() string } type Proxy struct { - Name string + GroupName string + BlockName string AvailableConnections pool.IPool busyConnections pool.IPool Logger zerolog.Logger @@ -65,6 +68,8 @@ func NewProxy( defer span.End() proxy := Proxy{ + GroupName: pxy.GroupName, + BlockName: pxy.BlockName, AvailableConnections: pxy.AvailableConnections, busyConnections: pool.NewPool(proxyCtx, config.EmptyPoolCapacity), Logger: pxy.Logger, @@ -118,7 +123,8 @@ func NewProxy( }) proxy.Logger.Trace().Str("duration", time.Since(now).String()).Msg( "Finished the client health check") - metrics.ProxyHealthChecks.Inc() + metrics.ProxyHealthChecks.WithLabelValues( + proxy.GetGroupName(), proxy.GetBlockName()).Inc() }, ); err != nil { proxy.Logger.Error().Err(err).Msg("Failed to schedule the client health check") @@ -138,8 +144,12 @@ func NewProxy( return &proxy } -func (pr *Proxy) GetName() string { - return pr.Name +func (pr *Proxy) GetBlockName() string { + return pr.BlockName +} + +func (pr *Proxy) GetGroupName() string { + return pr.GroupName } // Connect maps a server connection from the available connection pool to a incoming connection. @@ -181,7 +191,7 @@ func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError { return err } - metrics.ProxiedConnections.Inc() + metrics.ProxiedConnections.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() fields := map[string]interface{}{ "function": "proxy.connect", @@ -244,7 +254,7 @@ func (pr *Proxy) Disconnect(conn *ConnWrapper) *gerr.GatewayDError { return gerr.ErrCastFailed } - metrics.ProxiedConnections.Dec() + metrics.ProxiedConnections.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Dec() pr.Logger.Debug().Fields( map[string]interface{}{ @@ -354,7 +364,7 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate }, ).Msg("Performed the TLS handshake") span.AddEvent("Performed the TLS handshake") - metrics.TLSConnections.Inc() + metrics.TLSConnections.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() } else { pr.Logger.Error().Fields( map[string]interface{}{ @@ -410,10 +420,10 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate } if modResponse, modReceived := pr.getPluginModifiedResponse(result); modResponse != nil { - metrics.ProxyPassThroughsToClient.Inc() - metrics.ProxyPassThroughTerminations.Inc() - metrics.BytesSentToClient.Observe(float64(modReceived)) - metrics.TotalTrafficBytes.Observe(float64(modReceived)) + metrics.ProxyPassThroughsToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() + metrics.ProxyPassThroughTerminations.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() + metrics.BytesSentToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(modReceived)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(modReceived)) span.AddEvent("Terminating connection") @@ -460,7 +470,7 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate } span.AddEvent("Ran the OnTrafficToServer hooks") - metrics.ProxyPassThroughsToServer.Inc() + metrics.ProxyPassThroughsToServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() return nil } @@ -597,7 +607,7 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate span.RecordError(errVerdict) } - metrics.ProxyPassThroughsToClient.Inc() + metrics.ProxyPassThroughsToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() return errVerdict } @@ -716,8 +726,8 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD pr.Logger.Debug().Err(err).Msg("Error reading from client") span.RecordError(err) - metrics.BytesReceivedFromClient.Observe(float64(read)) - metrics.TotalTrafficBytes.Observe(float64(read)) + metrics.BytesReceivedFromClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(read)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(read)) return chunk[:read], gerr.ErrReadFailed.Wrap(err) } @@ -744,8 +754,8 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD span.AddEvent("Received data from client") - metrics.BytesReceivedFromClient.Observe(float64(total)) - metrics.TotalTrafficBytes.Observe(float64(total)) + metrics.BytesReceivedFromClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(total)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(total)) return buffer.Bytes(), nil } @@ -777,8 +787,8 @@ func (pr *Proxy) sendTrafficToServer(client *Client, request []byte) (int, *gerr span.AddEvent("Sent data to database") - metrics.BytesSentToServer.Observe(float64(sent)) - metrics.TotalTrafficBytes.Observe(float64(sent)) + metrics.BytesSentToServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(sent)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(sent)) return sent, err } @@ -806,8 +816,8 @@ func (pr *Proxy) receiveTrafficFromServer(client *Client) (int, []byte, *gerr.Ga span.AddEvent("Received data from database") - metrics.BytesReceivedFromServer.Observe(float64(received)) - metrics.TotalTrafficBytes.Observe(float64(received)) + metrics.BytesReceivedFromServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) return received, response, err } @@ -847,8 +857,8 @@ func (pr *Proxy) sendTrafficToClient( span.AddEvent("Sent data to client") - metrics.BytesSentToClient.Observe(float64(received)) - metrics.TotalTrafficBytes.Observe(float64(received)) + metrics.BytesSentToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) return nil } diff --git a/network/roundrobin_test.go b/network/roundrobin_test.go index 6f6054e6..259e8c56 100644 --- a/network/roundrobin_test.go +++ b/network/roundrobin_test.go @@ -44,8 +44,8 @@ func TestRoundRobin_NextProxy(t *testing.T) { if !ok { t.Fatalf("test %d: expected proxy of type MockProxy, got %T", testIndex, proxy) } - if mockProxy.GetName() != expected { - t.Errorf("test %d: expected proxy name %s, got %s", testIndex, expected, mockProxy.GetName()) + if mockProxy.GetBlockName() != expected { + t.Errorf("test %d: expected proxy name %s, got %s", testIndex, expected, mockProxy.GetBlockName()) } } } diff --git a/network/server.go b/network/server.go index 210931a9..ec919d72 100644 --- a/network/server.go +++ b/network/server.go @@ -55,6 +55,8 @@ type Server struct { PluginTimeout time.Duration mu *sync.RWMutex + GroupName string + Network string // tcp/udp/unix Address string Options Option @@ -201,7 +203,7 @@ func (s *Server) OnOpen(conn *ConnWrapper) ([]byte, Action) { } span.AddEvent("Ran the OnOpened hooks") - metrics.ClientConnections.Inc() + metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Inc() return nil, None } @@ -265,7 +267,7 @@ func (s *Server) OnClose(conn *ConnWrapper, err error) Action { s.RemoveConnectionFromMap(conn) if conn.IsTLSEnabled() { - metrics.TLSConnections.Dec() + metrics.TLSConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec() } // Close the incoming connection. @@ -297,7 +299,7 @@ func (s *Server) OnClose(conn *ConnWrapper, err error) Action { } span.AddEvent("Ran the OnClosed hooks") - metrics.ClientConnections.Dec() + metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec() return Close } @@ -679,6 +681,7 @@ func NewServer( // Create the server. server := Server{ ctx: serverCtx, + GroupName: srv.GroupName, Network: srv.Network, Address: srv.Address, Options: srv.Options, diff --git a/network/weightedroundrobin.go b/network/weightedroundrobin.go index 0b10fcd8..0385761f 100644 --- a/network/weightedroundrobin.go +++ b/network/weightedroundrobin.go @@ -80,7 +80,7 @@ func (r *WeightedRoundRobin) NextProxy(_ IConnWrapper) (IProxy, *gerr.GatewayDEr // findProxyByName locates a proxy by its name in the provided list of proxies. func findProxyByName(name string, proxies []IProxy) IProxy { for _, proxy := range proxies { - if proxy.GetName() == name { + if proxy.GetBlockName() == name { return proxy } } diff --git a/network/weightedroundrobin_test.go b/network/weightedroundrobin_test.go index 505bafa6..2222b9c9 100644 --- a/network/weightedroundrobin_test.go +++ b/network/weightedroundrobin_test.go @@ -138,7 +138,7 @@ func TestWeightedRoundRobinNextProxy(t *testing.T) { mockProxy, ok := proxy.(MockProxy) require.True(t, ok, "expected proxy of type MockProxy, got %T", proxy) - counts[mockProxy.GetName()]++ + counts[mockProxy.GetBlockName()]++ } // Validate that the actual distribution of requests closely matches the expected distribution. @@ -191,7 +191,7 @@ func TestWeightedRoundRobinConcurrentAccess(t *testing.T) { if assert.Nil(t, err, "No error expected when getting a proxy") { // Safely update the proxy selection count using a mutex. mux.Lock() - proxySelection[proxy.GetName()]++ + proxySelection[proxy.GetBlockName()]++ mux.Unlock() } }()