Skip to content

Commit

Permalink
整理(新增、删除)metrics指标 (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanpf authored Aug 28, 2023
1 parent 05e90b9 commit 1d472e2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 181 deletions.
16 changes: 10 additions & 6 deletions pjs/http/codec.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
((
{
metrics,
metricsCache,
durationCache,
} = pipy.solve('lib/metrics.js'),
{ metrics } = pipy.solve('lib/metrics.js'),
) => pipy()

.export('http', {
__http: null,
__request: null,
})

.pipeline()
Expand All @@ -26,7 +23,14 @@
.demuxHTTP().to(
$=>$
.handleMessageStart(
msg => (__http = msg?.head)
msg => (
__http = msg?.head,
__request = { head: msg?.head, reqTime: Date.now() },
metrics.fgwHttpRequestsTotal.increase()
)
)
.handleMessageEnd(
msg => __request.tail = msg.tail
)
.chain()
)
Expand Down
120 changes: 57 additions & 63 deletions pjs/http/metrics.js
Original file line number Diff line number Diff line change
@@ -1,92 +1,86 @@
((
{
metrics,
metricsCache,
durationCache,
} = pipy.solve('lib/metrics.js'),
{ metricsCache } = pipy.solve('lib/metrics.js'),
) => (

pipy({
_request: null,
_requestTime: null,
_requestSize: 0,
_metrics: null,
_route: null,
_consumer: null,
_target: null,
_status: null,
_requestTime: null,
_responseTime: null,
})

.import({
__request: 'http',
__domain: 'route',
__route: 'route',
__service: 'service',
__target: 'connect-tcp',
__consumer: 'consumer',
__target: 'connect-tcp',
})

.pipeline()
.handleMessageStart(
(msg) => (
_request = msg,
// add HTTP header size
_requestTime = Date.now(),
metrics.fgwHttpRequestsTotal.increase()
)
)
.handleData(
data => (
_requestSize += data.size
() => (
_requestTime = Date.now()
)
)
.chain()
.handleMessageStart(
(msg) => (
(
serviceName = __service?.name,
status = msg?.head?.status,
statusClass = Math.floor(status / 100),
durationHist = durationCache.get(serviceName),
) => (
durationHist && durationHist.observe(Date.now() - _requestTime),
_metrics = metricsCache.get(serviceName),
_metrics && (

_metrics.upstreamCompletedCount.increase(),
_metrics.upstreamResponseTotal.increase(),
status && (

_metrics.fgwHttpStatus.withLabels(
status,
__route?.config?.route || '',
__route?.config?.Path?.Path || '',
__domain?.name || '',
__consumer?.name || '',
__target || '',
(_request?.head?.path || '').split('?')[0]
).increase(),
_route = __route?.config?.route || '',
_consumer = __consumer?.name || '',
_target = __target || '',
_status = msg?.head?.status,
_metrics = metricsCache.get(__service?.name),

_metrics.fgwBandwidth.withLabels(
'egress',
__route?.config?.route || '',
__consumer?.name || '',
__inbound.remoteAddress || ''
).increase(_requestSize),
_requestSize = 0,
// add HTTP header size
__request.tail && _metrics.fgwBandwidth.withLabels(
'egress',
_route,
_consumer,
__inbound.remoteAddress || ''
).increase(__request.tail.headSize + __request.tail.bodySize),

_metrics.upstreamCodeCount.withLabels(status).increase(),
_metrics.upstreamCodeXCount.withLabels(statusClass).increase(),
_metrics.upstreamResponseCode.withLabels(statusClass).increase()
)
)
)
)()
_status && _metrics.fgwHttpStatus.withLabels(
_status,
_route,
__route?.config?.Path?.Path || '',
__domain?.name || '',
_consumer,
_target
).increase()
)
)
.handleData(
data => (
_metrics && _metrics.fgwBandwidth.withLabels(
.handleMessageEnd(
msg => (
_responseTime = Date.now(),
_metrics.fgwHttpLatency.withLabels(
_route,
_consumer,
'upstream',
_target
).observe(_responseTime - _requestTime),
_metrics.fgwHttpLatency.withLabels(
_route,
_consumer,
'fgw',
_target
).observe(_requestTime - __request.reqTime),
_metrics.fgwHttpLatency.withLabels(
_route,
_consumer,
'request',
_target
).observe(_responseTime - __request.reqTime),

msg.tail && _metrics.fgwBandwidth.withLabels(
'ingress',
__route?.config?.route || '',
__consumer?.name || '',
_route,
_consumer,
__inbound.remoteAddress || ''
).increase(data.size)
).increase(msg.tail.headSize + msg.tail.bodySize)
)
)

Expand Down
139 changes: 27 additions & 112 deletions pjs/lib/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,8 @@
(
config = pipy.solve('config.js'),

{
namespace,
kind,
name,
pod,
} = pipy.solve('lib/utils.js'),

fgwHttpStatus = new stats.Counter('fgw_http_status', [
'service', 'code', 'route', 'matched_uri', 'matched_host', 'consumer', 'node', 'path'
'service', 'code', 'route', 'matched_uri', 'matched_host', 'consumer', 'node'
]),

fgwBandwidth = new stats.Counter('fgw_bandwidth', [
Expand All @@ -27,140 +20,62 @@
'name', 'ip', 'port'
]),

sendBytesTotalCounter = new stats.Counter('fgw_service_upstream_cx_tx_bytes_total', [
'fgw_service_name'
]),
receiveBytesTotalCounter = new stats.Counter('fgw_service_upstream_cx_rx_bytes_total', [
'fgw_service_name'
]),
activeConnectionGauge = new stats.Gauge('fgw_service_upstream_cx_active', [
'fgw_service_name'
]),
upstreamCompletedCount = new stats.Counter('fgw_service_external_upstream_rq_completed', [
'fgw_service_name'
]),
destroyRemoteActiveCounter = new stats.Counter('fgw_service_upstream_cx_destroy_remote_with_active_rq', [
'fgw_service_name'
]),
destroyLocalActiveCounter = new stats.Counter('fgw_service_upstream_cx_destroy_local_with_active_rq', [
'fgw_service_name'
]),
connectTimeoutCounter = new stats.Counter('fgw_service_upstream_cx_connect_timeout', [
'fgw_service_name'
]),
pendingFailureEjectCounter = new stats.Counter('fgw_service_upstream_rq_pending_failure_eject', [
'fgw_service_name'
]),
pendingOverflowCounter = new stats.Counter('fgw_service_upstream_rq_pending_overflow', [
'fgw_service_name'
]),
requestTimeoutCounter = new stats.Counter('fgw_service_upstream_rq_timeout', [
'fgw_service_name'
]),
requestReceiveResetCounter = new stats.Counter('fgw_service_upstream_rq_rx_reset', [
'fgw_service_name'
]),
requestSendResetCounter = new stats.Counter('fgw_service_upstream_rq_tx_reset', [
'fgw_service_name'
]),
upstreamCodeCount = new stats.Counter('fgw_service_external_upstream_rq', [
'fgw_service_name',
'fgw_response_code'
]),
upstreamCodeXCount = new stats.Counter('fgw_service_external_upstream_rq_xx', [
'fgw_service_name',
'fgw_response_code_class'
fgwHttpLatency = new stats.Histogram('fgw_http_latency', [
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 300000, 600000, 1800000, 3600000, Infinity
], [
'service',
'route',
'consumer',
'type',
'node'
]),
upstreamResponseTotal = new stats.Counter('fgw_service_upstream_rq_total', [
'source_namespace',
'source_workload_kind',
'source_workload_name',
'source_workload_pod',
'fgw_service_name'

sendBytesTotalCounter = new stats.Counter('fgw_upstream_tx_bytes_total', [
'service'
]),
upstreamResponseCode = new stats.Counter('fgw_service_upstream_rq_xx', [
'source_namespace',
'source_workload_kind',
'source_workload_name',
'source_workload_pod',
'fgw_service_name',
'fgw_response_code_class'

receiveBytesTotalCounter = new stats.Counter('fgw_upstream_rx_bytes_total', [
'service'
]),

fgwRequestDurationHist = new stats.Histogram('fgw_request_duration_ms', [
5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 300000, 600000, 1800000, 3600000, Infinity
], [
'source_namespace',
'source_kind',
'source_name',
'source_pod',
'fgw_service_name'
activeConnectionGauge = new stats.Gauge('fgw_upstream_connection_active', [
'service'
]),

metrics = {
fgwHttpRequestsTotal,
fgwHttpCurrentConnections,
fgwUpstreamStatus,
fgwHttpRequestsTotal, // codec.js
fgwHttpCurrentConnections, // codec.js
fgwUpstreamStatus, // health-check.js
},

metricsCache = new algo.Cache(serviceName => (
{
fgwHttpStatus: fgwHttpStatus.withLabels(serviceName),
fgwBandwidth: fgwBandwidth.withLabels(serviceName),
fgwHttpRequestsTotal,
fgwHttpCurrentConnections,

sendBytesTotalCounter: sendBytesTotalCounter.withLabels(serviceName),
receiveBytesTotalCounter: receiveBytesTotalCounter.withLabels(serviceName),
activeConnectionGauge: activeConnectionGauge.withLabels(serviceName),
upstreamCompletedCount: upstreamCompletedCount.withLabels(serviceName),
destroyRemoteActiveCounter: destroyRemoteActiveCounter.withLabels(serviceName),
destroyLocalActiveCounter: destroyLocalActiveCounter.withLabels(serviceName),
connectTimeoutCounter: connectTimeoutCounter.withLabels(serviceName),
pendingFailureEjectCounter: pendingFailureEjectCounter.withLabels(serviceName),
pendingOverflowCounter: pendingOverflowCounter.withLabels(serviceName),
requestTimeoutCounter: requestTimeoutCounter.withLabels(serviceName),
requestReceiveResetCounter: requestReceiveResetCounter.withLabels(serviceName),
requestSendResetCounter: requestSendResetCounter.withLabels(serviceName),
upstreamCodeCount: upstreamCodeCount.withLabels(serviceName),
upstreamCodeXCount: upstreamCodeXCount.withLabels(serviceName),
upstreamResponseTotal: upstreamResponseTotal.withLabels(namespace, kind, name, pod, serviceName),
upstreamResponseCode: upstreamResponseCode.withLabels(namespace, kind, name, pod, serviceName),
fgwHttpStatus: fgwHttpStatus.withLabels(serviceName), // metrics.js
fgwBandwidth: fgwBandwidth.withLabels(serviceName), // metrics.js
fgwHttpLatency: fgwHttpLatency.withLabels(serviceName), // metrics.js
sendBytesTotalCounter: sendBytesTotalCounter.withLabels(serviceName), // connect-tcp.js
receiveBytesTotalCounter: receiveBytesTotalCounter.withLabels(serviceName), // connect-tcp.js
activeConnectionGauge: activeConnectionGauge.withLabels(serviceName), // connect-tcp.js
}
)),

durationCache = new algo.Cache(serviceName => (
fgwRequestDurationHist.withLabels(namespace, kind, name, pod, serviceName)
)),

) => (

Object.keys(config?.Services || {}).forEach(
serviceName => (
(
metrics = metricsCache.get(serviceName),
) => (
metrics.upstreamResponseTotal.zero(),
metrics.upstreamResponseCode.withLabels('5').zero(),
metrics.activeConnectionGauge.zero(),
metrics.receiveBytesTotalCounter.zero(),
metrics.sendBytesTotalCounter.zero(),
metrics.connectTimeoutCounter.zero(),
metrics.destroyLocalActiveCounter.zero(),
metrics.destroyRemoteActiveCounter.zero(),
metrics.pendingFailureEjectCounter.zero(),
metrics.pendingOverflowCounter.zero(),
metrics.requestTimeoutCounter.zero(),
metrics.requestReceiveResetCounter.zero(),
metrics.requestSendResetCounter.zero()
metrics.sendBytesTotalCounter.zero()
)
)()
),

{
metrics,
metricsCache,
durationCache,
rateLimitCounter: new stats.Counter('http_local_rate_limiter', [
'http_local_rate_limit'
]),
Expand Down

0 comments on commit 1d472e2

Please sign in to comment.