From 0a81e5261116cdabe906c06c3ebf5d104303d9ff Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Tue, 1 Oct 2024 22:19:48 +0200 Subject: [PATCH 1/3] Add group and block names to metrics --- cmd/run.go | 13 +++++-- config/types.go | 3 ++ metrics/builtins.go | 52 +++++++++++++-------------- network/client.go | 11 +++--- network/network_helpers_test.go | 4 +-- network/proxy.go | 57 +++++++++++++++++------------- network/roundrobin_test.go | 4 +-- network/server.go | 9 +++-- network/weightedroundrobin.go | 2 +- network/weightedroundrobin_test.go | 4 +-- 10 files changed, 92 insertions(+), 67 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 31bfe75b..d18d4d23 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -696,6 +696,8 @@ var runCmd = &cobra.Command{ // Add clients to the pool. for range currentPoolSize { clientConfig := clients[configGroupName][configBlockName] + clientConfig.GroupName = configGroupName + clientConfig.BlockName = configBlockName client := network.NewClient( runCtx, clientConfig, logger, network.NewRetry( @@ -716,6 +718,7 @@ var runCmd = &cobra.Command{ if client != nil { eventOptions := trace.WithAttributes( attribute.String("name", configBlockName), + attribute.String("group", configGroupName), attribute.String("network", client.Network), attribute.String("address", client.Address), attribute.Int("receiveChunkSize", client.ReceiveChunkSize), @@ -746,6 +749,8 @@ var runCmd = &cobra.Command{ clientCfg := map[string]interface{}{ "id": client.ID, + "name": configBlockName, + "group": configGroupName, "network": client.Network, "address": client.Address, "receiveChunkSize": client.ReceiveChunkSize, @@ -851,7 +856,8 @@ var runCmd = &cobra.Command{ proxies[configGroupName][configBlockName] = network.NewProxy( runCtx, network.Proxy{ - Name: configBlockName, + GroupName: configGroupName, + BlockName: configBlockName, AvailableConnections: pools[configGroupName][configBlockName], PluginRegistry: pluginRegistry, HealthCheckPeriod: cfg.HealthCheckPeriod, @@ -899,8 +905,9 @@ var runCmd = &cobra.Command{ servers[name] = network.NewServer( runCtx, network.Server{ - Network: cfg.Network, - Address: cfg.Address, + GroupName: name, + Network: cfg.Network, + Address: cfg.Address, TickInterval: config.If( cfg.TickInterval > 0, cfg.TickInterval, diff --git a/config/types.go b/config/types.go index 797bdf96..d065fc57 100644 --- a/config/types.go +++ b/config/types.go @@ -44,6 +44,9 @@ type ActionRedisConfig struct { } type Client struct { + BlockName string `json:"-"` + GroupName string `json:"-"` + Network string `json:"network" jsonschema:"enum=tcp,enum=udp,enum=unix" yaml:"network"` Address string `json:"address" yaml:"address"` TCPKeepAlive bool `json:"tcpKeepAlive" yaml:"tcpKeepAlive"` diff --git a/metrics/builtins.go b/metrics/builtins.go index 85f488ff..0e48bdae 100644 --- a/metrics/builtins.go +++ b/metrics/builtins.go @@ -10,51 +10,51 @@ const ( ) var ( - ClientConnections = promauto.NewGauge(prometheus.GaugeOpts{ + ClientConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "client_connections", Help: "Number of client connections", - }) - ServerConnections = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{"group", "block"}) + ServerConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "server_connections", Help: "Number of server connections", - }) - TLSConnections = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{"group", "block"}) + TLSConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "tls_connections", Help: "Number of TLS connections", - }) + }, []string{"group", "block"}) ServerTicksFired = promauto.NewCounter(prometheus.CounterOpts{ Namespace: Namespace, Name: "server_ticks_fired_total", Help: "Total number of server ticks fired", }) - BytesReceivedFromClient = promauto.NewSummary(prometheus.SummaryOpts{ + BytesReceivedFromClient = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_received_from_client", Help: "Number of bytes received from client", - }) - BytesSentToServer = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + BytesSentToServer = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_sent_to_server", Help: "Number of bytes sent to server", - }) - BytesReceivedFromServer = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + BytesReceivedFromServer = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_received_from_server", Help: "Number of bytes received from server", - }) - BytesSentToClient = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + BytesSentToClient = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "bytes_sent_to_client", Help: "Number of bytes sent to client", - }) - TotalTrafficBytes = promauto.NewSummary(prometheus.SummaryOpts{ + }, []string{"group", "block"}) + TotalTrafficBytes = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: Namespace, Name: "traffic_bytes", Help: "Number of total bytes passed through GatewayD via client or server", - }) + }, []string{"group", "block"}) PluginsLoaded = promauto.NewCounter(prometheus.CounterOpts{ Namespace: Namespace, Name: "plugins_loaded_total", @@ -70,31 +70,31 @@ var ( Name: "plugin_hooks_executed_total", Help: "Number of plugin hooks executed", }) - ProxyHealthChecks = promauto.NewCounter(prometheus.CounterOpts{ + ProxyHealthChecks = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_health_checks_total", Help: "Number of proxy health checks", - }) - ProxiedConnections = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{"group", "block"}) + ProxiedConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Name: "proxied_connections", Help: "Number of proxy connects", - }) - ProxyPassThroughsToClient = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"group", "block"}) + ProxyPassThroughsToClient = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_passthroughs_to_client_total", Help: "Number of successful proxy passthroughs from server to client", - }) - ProxyPassThroughsToServer = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"group", "block"}) + ProxyPassThroughsToServer = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_passthroughs_to_server_total", Help: "Number of successful proxy passthroughs from client to server", - }) - ProxyPassThroughTerminations = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"group", "block"}) + ProxyPassThroughTerminations = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "proxy_passthrough_terminations_total", Help: "Number of proxy passthrough terminations by plugins", - }) + }, []string{"group", "block"}) APIRequests = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Name: "api_requests_total", diff --git a/network/client.go b/network/client.go index 3d859599..6d8a2537 100644 --- a/network/client.go +++ b/network/client.go @@ -35,6 +35,9 @@ type Client struct { mu sync.Mutex retry IRetry + GroupName string + BlockName string + TCPKeepAlive bool TCPKeepAlivePeriod time.Duration ReceiveChunkSize int @@ -168,7 +171,7 @@ func NewClient( logger, ) - metrics.ServerConnections.Inc() + metrics.ServerConnections.WithLabelValues(clientConfig.GroupName, clientConfig.BlockName).Inc() return &client } @@ -267,7 +270,7 @@ func (c *Client) Reconnect() error { if c.conn != nil { c.Close() } else { - metrics.ServerConnections.Dec() + metrics.ServerConnections.WithLabelValues(c.GroupName, c.BlockName).Dec() } c.connected.Store(false) @@ -306,7 +309,7 @@ func (c *Client) Reconnect() error { ) c.connected.Store(true) c.logger.Debug().Str("address", c.Address).Msg("Reconnected to server") - metrics.ServerConnections.Inc() + metrics.ServerConnections.WithLabelValues(c.GroupName, c.BlockName).Inc() span.AddEvent("Reconnected to server") return nil @@ -343,7 +346,7 @@ func (c *Client) Close() { c.Address = "" c.Network = "" - metrics.ServerConnections.Dec() + metrics.ServerConnections.WithLabelValues(c.GroupName, c.BlockName).Dec() span.AddEvent("Closed connection to server") } diff --git a/network/network_helpers_test.go b/network/network_helpers_test.go index 66682ce0..d52e6bb6 100644 --- a/network/network_helpers_test.go +++ b/network/network_helpers_test.go @@ -290,8 +290,8 @@ func (m MockProxy) BusyConnectionsString() []string { return nil } -// GetName returns the name of the MockProxy. -func (m MockProxy) GetName() string { +// GetBlockName returns the name of the MockProxy. +func (m MockProxy) GetBlockName() string { return m.name } diff --git a/network/proxy.go b/network/proxy.go index b0a0ddaf..664f0388 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -36,11 +36,13 @@ type IProxy interface { Shutdown() AvailableConnectionsString() []string BusyConnectionsString() []string - GetName() string + GetGroupName() string + GetBlockName() string } type Proxy struct { - Name string + GroupName string + BlockName string AvailableConnections pool.IPool busyConnections pool.IPool Logger zerolog.Logger @@ -65,6 +67,8 @@ func NewProxy( defer span.End() proxy := Proxy{ + GroupName: pxy.GroupName, + BlockName: pxy.BlockName, AvailableConnections: pxy.AvailableConnections, busyConnections: pool.NewPool(proxyCtx, config.EmptyPoolCapacity), Logger: pxy.Logger, @@ -118,7 +122,8 @@ func NewProxy( }) proxy.Logger.Trace().Str("duration", time.Since(now).String()).Msg( "Finished the client health check") - metrics.ProxyHealthChecks.Inc() + metrics.ProxyHealthChecks.WithLabelValues( + proxy.GetGroupName(), proxy.GetBlockName()).Inc() }, ); err != nil { proxy.Logger.Error().Err(err).Msg("Failed to schedule the client health check") @@ -138,8 +143,12 @@ func NewProxy( return &proxy } -func (pr *Proxy) GetName() string { - return pr.Name +func (pr *Proxy) GetBlockName() string { + return pr.BlockName +} + +func (pr *Proxy) GetGroupName() string { + return pr.GroupName } // Connect maps a server connection from the available connection pool to a incoming connection. @@ -181,7 +190,7 @@ func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError { return err } - metrics.ProxiedConnections.Inc() + metrics.ProxiedConnections.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() fields := map[string]interface{}{ "function": "proxy.connect", @@ -244,7 +253,7 @@ func (pr *Proxy) Disconnect(conn *ConnWrapper) *gerr.GatewayDError { return gerr.ErrCastFailed } - metrics.ProxiedConnections.Dec() + metrics.ProxiedConnections.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Dec() pr.Logger.Debug().Fields( map[string]interface{}{ @@ -354,7 +363,7 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate }, ).Msg("Performed the TLS handshake") span.AddEvent("Performed the TLS handshake") - metrics.TLSConnections.Inc() + metrics.TLSConnections.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() } else { pr.Logger.Error().Fields( map[string]interface{}{ @@ -410,10 +419,10 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate } if modResponse, modReceived := pr.getPluginModifiedResponse(result); modResponse != nil { - metrics.ProxyPassThroughsToClient.Inc() - metrics.ProxyPassThroughTerminations.Inc() - metrics.BytesSentToClient.Observe(float64(modReceived)) - metrics.TotalTrafficBytes.Observe(float64(modReceived)) + metrics.ProxyPassThroughsToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() + metrics.ProxyPassThroughTerminations.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() + metrics.BytesSentToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(modReceived)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(modReceived)) span.AddEvent("Terminating connection") @@ -460,7 +469,7 @@ func (pr *Proxy) PassThroughToServer(conn *ConnWrapper, stack *Stack) *gerr.Gate } span.AddEvent("Ran the OnTrafficToServer hooks") - metrics.ProxyPassThroughsToServer.Inc() + metrics.ProxyPassThroughsToServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() return nil } @@ -597,7 +606,7 @@ func (pr *Proxy) PassThroughToClient(conn *ConnWrapper, stack *Stack) *gerr.Gate span.RecordError(errVerdict) } - metrics.ProxyPassThroughsToClient.Inc() + metrics.ProxyPassThroughsToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Inc() return errVerdict } @@ -716,8 +725,8 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD pr.Logger.Debug().Err(err).Msg("Error reading from client") span.RecordError(err) - metrics.BytesReceivedFromClient.Observe(float64(read)) - metrics.TotalTrafficBytes.Observe(float64(read)) + metrics.BytesReceivedFromClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(read)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(read)) return chunk[:read], gerr.ErrReadFailed.Wrap(err) } @@ -744,8 +753,8 @@ func (pr *Proxy) receiveTrafficFromClient(conn net.Conn) ([]byte, *gerr.GatewayD span.AddEvent("Received data from client") - metrics.BytesReceivedFromClient.Observe(float64(total)) - metrics.TotalTrafficBytes.Observe(float64(total)) + metrics.BytesReceivedFromClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(total)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(total)) return buffer.Bytes(), nil } @@ -777,8 +786,8 @@ func (pr *Proxy) sendTrafficToServer(client *Client, request []byte) (int, *gerr span.AddEvent("Sent data to database") - metrics.BytesSentToServer.Observe(float64(sent)) - metrics.TotalTrafficBytes.Observe(float64(sent)) + metrics.BytesSentToServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(sent)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(sent)) return sent, err } @@ -806,8 +815,8 @@ func (pr *Proxy) receiveTrafficFromServer(client *Client) (int, []byte, *gerr.Ga span.AddEvent("Received data from database") - metrics.BytesReceivedFromServer.Observe(float64(received)) - metrics.TotalTrafficBytes.Observe(float64(received)) + metrics.BytesReceivedFromServer.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) return received, response, err } @@ -847,8 +856,8 @@ func (pr *Proxy) sendTrafficToClient( span.AddEvent("Sent data to client") - metrics.BytesSentToClient.Observe(float64(received)) - metrics.TotalTrafficBytes.Observe(float64(received)) + metrics.BytesSentToClient.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) + metrics.TotalTrafficBytes.WithLabelValues(pr.GetGroupName(), pr.GetBlockName()).Observe(float64(received)) return nil } diff --git a/network/roundrobin_test.go b/network/roundrobin_test.go index 6f6054e6..259e8c56 100644 --- a/network/roundrobin_test.go +++ b/network/roundrobin_test.go @@ -44,8 +44,8 @@ func TestRoundRobin_NextProxy(t *testing.T) { if !ok { t.Fatalf("test %d: expected proxy of type MockProxy, got %T", testIndex, proxy) } - if mockProxy.GetName() != expected { - t.Errorf("test %d: expected proxy name %s, got %s", testIndex, expected, mockProxy.GetName()) + if mockProxy.GetBlockName() != expected { + t.Errorf("test %d: expected proxy name %s, got %s", testIndex, expected, mockProxy.GetBlockName()) } } } diff --git a/network/server.go b/network/server.go index 210931a9..ec919d72 100644 --- a/network/server.go +++ b/network/server.go @@ -55,6 +55,8 @@ type Server struct { PluginTimeout time.Duration mu *sync.RWMutex + GroupName string + Network string // tcp/udp/unix Address string Options Option @@ -201,7 +203,7 @@ func (s *Server) OnOpen(conn *ConnWrapper) ([]byte, Action) { } span.AddEvent("Ran the OnOpened hooks") - metrics.ClientConnections.Inc() + metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Inc() return nil, None } @@ -265,7 +267,7 @@ func (s *Server) OnClose(conn *ConnWrapper, err error) Action { s.RemoveConnectionFromMap(conn) if conn.IsTLSEnabled() { - metrics.TLSConnections.Dec() + metrics.TLSConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec() } // Close the incoming connection. @@ -297,7 +299,7 @@ func (s *Server) OnClose(conn *ConnWrapper, err error) Action { } span.AddEvent("Ran the OnClosed hooks") - metrics.ClientConnections.Dec() + metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec() return Close } @@ -679,6 +681,7 @@ func NewServer( // Create the server. server := Server{ ctx: serverCtx, + GroupName: srv.GroupName, Network: srv.Network, Address: srv.Address, Options: srv.Options, diff --git a/network/weightedroundrobin.go b/network/weightedroundrobin.go index 0b10fcd8..0385761f 100644 --- a/network/weightedroundrobin.go +++ b/network/weightedroundrobin.go @@ -80,7 +80,7 @@ func (r *WeightedRoundRobin) NextProxy(_ IConnWrapper) (IProxy, *gerr.GatewayDEr // findProxyByName locates a proxy by its name in the provided list of proxies. func findProxyByName(name string, proxies []IProxy) IProxy { for _, proxy := range proxies { - if proxy.GetName() == name { + if proxy.GetBlockName() == name { return proxy } } diff --git a/network/weightedroundrobin_test.go b/network/weightedroundrobin_test.go index 505bafa6..2222b9c9 100644 --- a/network/weightedroundrobin_test.go +++ b/network/weightedroundrobin_test.go @@ -138,7 +138,7 @@ func TestWeightedRoundRobinNextProxy(t *testing.T) { mockProxy, ok := proxy.(MockProxy) require.True(t, ok, "expected proxy of type MockProxy, got %T", proxy) - counts[mockProxy.GetName()]++ + counts[mockProxy.GetBlockName()]++ } // Validate that the actual distribution of requests closely matches the expected distribution. @@ -191,7 +191,7 @@ func TestWeightedRoundRobinConcurrentAccess(t *testing.T) { if assert.Nil(t, err, "No error expected when getting a proxy") { // Safely update the proxy selection count using a mutex. mux.Lock() - proxySelection[proxy.GetName()]++ + proxySelection[proxy.GetBlockName()]++ mux.Unlock() } }() From b3d5d7ba5f6619645fe81ad910368a272602a630 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Tue, 1 Oct 2024 22:24:50 +0200 Subject: [PATCH 2/3] Add comment --- gatewayd.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gatewayd.yaml b/gatewayd.yaml index fbb8452e..d1b05072 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -85,7 +85,7 @@ servers: # Load balancer strategies can be found in config/constants.go strategy: ROUND_ROBIN # ROUND_ROBIN, RANDOM, WEIGHTED_ROUND_ROBIN consistentHash: - useSourceIp: true + useSourceIp: true # Set to false for using the RANDOM strategy # Optional configuration for strategies that support rules (e.g., WEIGHTED_ROUND_ROBIN) # loadBalancingRules: # - condition: "DEFAULT" # Currently, only the "DEFAULT" condition is supported From 44667ca0a2d811926cde5342c85e399f32ba8092 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Tue, 1 Oct 2024 22:37:10 +0200 Subject: [PATCH 3/3] Implement missing mock function and ignore linter --- network/network_helpers_test.go | 4 ++++ network/proxy.go | 1 + 2 files changed, 5 insertions(+) diff --git a/network/network_helpers_test.go b/network/network_helpers_test.go index d52e6bb6..2ab4d250 100644 --- a/network/network_helpers_test.go +++ b/network/network_helpers_test.go @@ -295,6 +295,10 @@ func (m MockProxy) GetBlockName() string { return m.name } +func (m MockProxy) GetGroupName() string { + return "default" +} + // Mock implementation of IConnWrapper. type MockConnWrapper struct { mock.Mock diff --git a/network/proxy.go b/network/proxy.go index 664f0388..aa9971c7 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -26,6 +26,7 @@ import ( "golang.org/x/exp/maps" ) +//nolint:interfacebloat type IProxy interface { Connect(conn *ConnWrapper) *gerr.GatewayDError Disconnect(conn *ConnWrapper) *gerr.GatewayDError