diff --git a/pjs/http/codec.js b/pjs/http/codec.js index c70e2d6..caf6815 100644 --- a/pjs/http/codec.js +++ b/pjs/http/codec.js @@ -1,13 +1,10 @@ (( - { - metrics, - metricsCache, - durationCache, - } = pipy.solve('lib/metrics.js'), + { metrics } = pipy.solve('lib/metrics.js'), ) => pipy() .export('http', { __http: null, + __request: null, }) .pipeline() @@ -26,7 +23,14 @@ .demuxHTTP().to( $=>$ .handleMessageStart( - msg => (__http = msg?.head) + msg => ( + __http = msg?.head, + __request = { head: msg?.head, reqTime: Date.now() }, + metrics.fgwHttpRequestsTotal.increase() + ) + ) + .handleMessageEnd( + msg => __request.tail = msg.tail ) .chain() ) diff --git a/pjs/http/metrics.js b/pjs/http/metrics.js index 54ca6fb..5168bdd 100644 --- a/pjs/http/metrics.js +++ b/pjs/http/metrics.js @@ -1,92 +1,86 @@ (( - { - metrics, - metricsCache, - durationCache, - } = pipy.solve('lib/metrics.js'), + { metricsCache } = pipy.solve('lib/metrics.js'), ) => ( pipy({ - _request: null, - _requestTime: null, - _requestSize: 0, _metrics: null, + _route: null, + _consumer: null, + _target: null, + _status: null, + _requestTime: null, + _responseTime: null, }) .import({ + __request: 'http', __domain: 'route', __route: 'route', __service: 'service', - __target: 'connect-tcp', __consumer: 'consumer', + __target: 'connect-tcp', }) .pipeline() .handleMessageStart( - (msg) => ( - _request = msg, - // add HTTP header size - _requestTime = Date.now(), - metrics.fgwHttpRequestsTotal.increase() - ) -) -.handleData( - data => ( - _requestSize += data.size + () => ( + _requestTime = Date.now() ) ) .chain() .handleMessageStart( (msg) => ( - ( - serviceName = __service?.name, - status = msg?.head?.status, - statusClass = Math.floor(status / 100), - durationHist = durationCache.get(serviceName), - ) => ( - durationHist && durationHist.observe(Date.now() - _requestTime), - _metrics = metricsCache.get(serviceName), - _metrics && ( - - _metrics.upstreamCompletedCount.increase(), - _metrics.upstreamResponseTotal.increase(), - status && ( - - _metrics.fgwHttpStatus.withLabels( - status, - __route?.config?.route || '', - __route?.config?.Path?.Path || '', - __domain?.name || '', - __consumer?.name || '', - __target || '', - (_request?.head?.path || '').split('?')[0] - ).increase(), + _route = __route?.config?.route || '', + _consumer = __consumer?.name || '', + _target = __target || '', + _status = msg?.head?.status, + _metrics = metricsCache.get(__service?.name), - _metrics.fgwBandwidth.withLabels( - 'egress', - __route?.config?.route || '', - __consumer?.name || '', - __inbound.remoteAddress || '' - ).increase(_requestSize), - _requestSize = 0, - // add HTTP header size + __request.tail && _metrics.fgwBandwidth.withLabels( + 'egress', + _route, + _consumer, + __inbound.remoteAddress || '' + ).increase(__request.tail.headSize + __request.tail.bodySize), - _metrics.upstreamCodeCount.withLabels(status).increase(), - _metrics.upstreamCodeXCount.withLabels(statusClass).increase(), - _metrics.upstreamResponseCode.withLabels(statusClass).increase() - ) - ) - ) - )() + _status && _metrics.fgwHttpStatus.withLabels( + _status, + _route, + __route?.config?.Path?.Path || '', + __domain?.name || '', + _consumer, + _target + ).increase() + ) ) -.handleData( - data => ( - _metrics && _metrics.fgwBandwidth.withLabels( +.handleMessageEnd( + msg => ( + _responseTime = Date.now(), + _metrics.fgwHttpLatency.withLabels( + _route, + _consumer, + 'upstream', + _target + ).observe(_responseTime - _requestTime), + _metrics.fgwHttpLatency.withLabels( + _route, + _consumer, + 'fgw', + _target + ).observe(_requestTime - __request.reqTime), + _metrics.fgwHttpLatency.withLabels( + _route, + _consumer, + 'request', + _target + ).observe(_responseTime - __request.reqTime), + + msg.tail && _metrics.fgwBandwidth.withLabels( 'ingress', - __route?.config?.route || '', - __consumer?.name || '', + _route, + _consumer, __inbound.remoteAddress || '' - ).increase(data.size) + ).increase(msg.tail.headSize + msg.tail.bodySize) ) ) diff --git a/pjs/lib/metrics.js b/pjs/lib/metrics.js index 7960a25..344d475 100644 --- a/pjs/lib/metrics.js +++ b/pjs/lib/metrics.js @@ -2,15 +2,8 @@ ( config = pipy.solve('config.js'), - { - namespace, - kind, - name, - pod, - } = pipy.solve('lib/utils.js'), - fgwHttpStatus = new stats.Counter('fgw_http_status', [ - 'service', 'code', 'route', 'matched_uri', 'matched_host', 'consumer', 'node', 'path' + 'service', 'code', 'route', 'matched_uri', 'matched_host', 'consumer', 'node' ]), fgwBandwidth = new stats.Counter('fgw_bandwidth', [ @@ -27,112 +20,45 @@ 'name', 'ip', 'port' ]), - sendBytesTotalCounter = new stats.Counter('fgw_service_upstream_cx_tx_bytes_total', [ - 'fgw_service_name' - ]), - receiveBytesTotalCounter = new stats.Counter('fgw_service_upstream_cx_rx_bytes_total', [ - 'fgw_service_name' - ]), - activeConnectionGauge = new stats.Gauge('fgw_service_upstream_cx_active', [ - 'fgw_service_name' - ]), - upstreamCompletedCount = new stats.Counter('fgw_service_external_upstream_rq_completed', [ - 'fgw_service_name' - ]), - destroyRemoteActiveCounter = new stats.Counter('fgw_service_upstream_cx_destroy_remote_with_active_rq', [ - 'fgw_service_name' - ]), - destroyLocalActiveCounter = new stats.Counter('fgw_service_upstream_cx_destroy_local_with_active_rq', [ - 'fgw_service_name' - ]), - connectTimeoutCounter = new stats.Counter('fgw_service_upstream_cx_connect_timeout', [ - 'fgw_service_name' - ]), - pendingFailureEjectCounter = new stats.Counter('fgw_service_upstream_rq_pending_failure_eject', [ - 'fgw_service_name' - ]), - pendingOverflowCounter = new stats.Counter('fgw_service_upstream_rq_pending_overflow', [ - 'fgw_service_name' - ]), - requestTimeoutCounter = new stats.Counter('fgw_service_upstream_rq_timeout', [ - 'fgw_service_name' - ]), - requestReceiveResetCounter = new stats.Counter('fgw_service_upstream_rq_rx_reset', [ - 'fgw_service_name' - ]), - requestSendResetCounter = new stats.Counter('fgw_service_upstream_rq_tx_reset', [ - 'fgw_service_name' - ]), - upstreamCodeCount = new stats.Counter('fgw_service_external_upstream_rq', [ - 'fgw_service_name', - 'fgw_response_code' - ]), - upstreamCodeXCount = new stats.Counter('fgw_service_external_upstream_rq_xx', [ - 'fgw_service_name', - 'fgw_response_code_class' + fgwHttpLatency = new stats.Histogram('fgw_http_latency', [ + 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 300000, 600000, 1800000, 3600000, Infinity + ], [ + 'service', + 'route', + 'consumer', + 'type', + 'node' ]), - upstreamResponseTotal = new stats.Counter('fgw_service_upstream_rq_total', [ - 'source_namespace', - 'source_workload_kind', - 'source_workload_name', - 'source_workload_pod', - 'fgw_service_name' + + sendBytesTotalCounter = new stats.Counter('fgw_upstream_tx_bytes_total', [ + 'service' ]), - upstreamResponseCode = new stats.Counter('fgw_service_upstream_rq_xx', [ - 'source_namespace', - 'source_workload_kind', - 'source_workload_name', - 'source_workload_pod', - 'fgw_service_name', - 'fgw_response_code_class' + + receiveBytesTotalCounter = new stats.Counter('fgw_upstream_rx_bytes_total', [ + 'service' ]), - fgwRequestDurationHist = new stats.Histogram('fgw_request_duration_ms', [ - 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 300000, 600000, 1800000, 3600000, Infinity - ], [ - 'source_namespace', - 'source_kind', - 'source_name', - 'source_pod', - 'fgw_service_name' + activeConnectionGauge = new stats.Gauge('fgw_upstream_connection_active', [ + 'service' ]), metrics = { - fgwHttpRequestsTotal, - fgwHttpCurrentConnections, - fgwUpstreamStatus, + fgwHttpRequestsTotal, // codec.js + fgwHttpCurrentConnections, // codec.js + fgwUpstreamStatus, // health-check.js }, metricsCache = new algo.Cache(serviceName => ( { - fgwHttpStatus: fgwHttpStatus.withLabels(serviceName), - fgwBandwidth: fgwBandwidth.withLabels(serviceName), - fgwHttpRequestsTotal, - fgwHttpCurrentConnections, - - sendBytesTotalCounter: sendBytesTotalCounter.withLabels(serviceName), - receiveBytesTotalCounter: receiveBytesTotalCounter.withLabels(serviceName), - activeConnectionGauge: activeConnectionGauge.withLabels(serviceName), - upstreamCompletedCount: upstreamCompletedCount.withLabels(serviceName), - destroyRemoteActiveCounter: destroyRemoteActiveCounter.withLabels(serviceName), - destroyLocalActiveCounter: destroyLocalActiveCounter.withLabels(serviceName), - connectTimeoutCounter: connectTimeoutCounter.withLabels(serviceName), - pendingFailureEjectCounter: pendingFailureEjectCounter.withLabels(serviceName), - pendingOverflowCounter: pendingOverflowCounter.withLabels(serviceName), - requestTimeoutCounter: requestTimeoutCounter.withLabels(serviceName), - requestReceiveResetCounter: requestReceiveResetCounter.withLabels(serviceName), - requestSendResetCounter: requestSendResetCounter.withLabels(serviceName), - upstreamCodeCount: upstreamCodeCount.withLabels(serviceName), - upstreamCodeXCount: upstreamCodeXCount.withLabels(serviceName), - upstreamResponseTotal: upstreamResponseTotal.withLabels(namespace, kind, name, pod, serviceName), - upstreamResponseCode: upstreamResponseCode.withLabels(namespace, kind, name, pod, serviceName), + fgwHttpStatus: fgwHttpStatus.withLabels(serviceName), // metrics.js + fgwBandwidth: fgwBandwidth.withLabels(serviceName), // metrics.js + fgwHttpLatency: fgwHttpLatency.withLabels(serviceName), // metrics.js + sendBytesTotalCounter: sendBytesTotalCounter.withLabels(serviceName), // connect-tcp.js + receiveBytesTotalCounter: receiveBytesTotalCounter.withLabels(serviceName), // connect-tcp.js + activeConnectionGauge: activeConnectionGauge.withLabels(serviceName), // connect-tcp.js } )), - durationCache = new algo.Cache(serviceName => ( - fgwRequestDurationHist.withLabels(namespace, kind, name, pod, serviceName) - )), - ) => ( Object.keys(config?.Services || {}).forEach( @@ -140,19 +66,9 @@ ( metrics = metricsCache.get(serviceName), ) => ( - metrics.upstreamResponseTotal.zero(), - metrics.upstreamResponseCode.withLabels('5').zero(), metrics.activeConnectionGauge.zero(), metrics.receiveBytesTotalCounter.zero(), - metrics.sendBytesTotalCounter.zero(), - metrics.connectTimeoutCounter.zero(), - metrics.destroyLocalActiveCounter.zero(), - metrics.destroyRemoteActiveCounter.zero(), - metrics.pendingFailureEjectCounter.zero(), - metrics.pendingOverflowCounter.zero(), - metrics.requestTimeoutCounter.zero(), - metrics.requestReceiveResetCounter.zero(), - metrics.requestSendResetCounter.zero() + metrics.sendBytesTotalCounter.zero() ) )() ), @@ -160,7 +76,6 @@ { metrics, metricsCache, - durationCache, rateLimitCounter: new stats.Counter('http_local_rate_limiter', [ 'http_local_rate_limit' ]),