Skip to content

Commit

Permalink
增加metrics指标 (#32)
Browse files Browse the repository at this point in the history
* 优化uuid转uint63的实现方式(使用Int对象代替js代码)

* 调整访问上游的session关键字(用于上游连接共享)

* 增加fgw_http_status指标

* metrics增加上游健康检查状态
  • Loading branch information
wanpf authored Aug 25, 2023
1 parent 1469320 commit 05e90b9
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 17 deletions.
20 changes: 18 additions & 2 deletions pjs/common/health-check.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
((
{ config, isDebugEnabled } = pipy.solve('config.js'),

{ metrics } = pipy.solve('lib/metrics.js'),

healthCheckTargets = {},

healthCheckServices = {},
Expand Down Expand Up @@ -38,7 +40,12 @@
isDebugEnabled && (
console.log('[health-check] ok - service, type, target:', name, type, target.target)
)
)
),
metrics.fgwUpstreamStatus.withLabels(
name,
target.ip,
target.port
).increase()
),

fail: target => (
Expand All @@ -56,7 +63,12 @@
isDebugEnabled && (
console.log('[health-check] fail - service, type, target:', name, type, target.target)
)
)
),
metrics.fgwUpstreamStatus.withLabels(
name,
target.ip,
target.port
).set(0)
),

available: target => (
Expand Down Expand Up @@ -113,6 +125,7 @@
healthCheckCache = new algo.Cache(makeHealthCheck),

) => pipy({
_idx: 0,
_service: null,
_target: null,
_resolve: null,
Expand All @@ -137,7 +150,10 @@
(_service = healthCheckCache.get(config.Services[name])) && (
Object.keys(config.Services[name].Endpoints || {}).forEach(
target => (
_idx = target.lastIndexOf(':'),
healthCheckTargets[target + '@' + name] = {
ip: target.substring(0, _idx),
port: target.substring(_idx + 1),
target,
service: _service,
alive: 1,
Expand Down
24 changes: 22 additions & 2 deletions pjs/http/codec.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
pipy()
((
{
metrics,
metricsCache,
durationCache,
} = pipy.solve('lib/metrics.js'),
) => pipy()

.export('http', {
__http: null,
})

.pipeline()
.handleStreamStart(
() => (
metrics.fgwHttpCurrentConnections.withLabels('accepted').increase(),
metrics.fgwHttpCurrentConnections.withLabels('active').increase()
)
)
.handleStreamEnd(
() => (
metrics.fgwHttpCurrentConnections.withLabels('handled').increase(),
metrics.fgwHttpCurrentConnections.withLabels('active').decrease()
)
)
.demuxHTTP().to(
$=>$
.handleMessageStart(
msg => (__http = msg?.head)
)
.chain()
)
)

)()
69 changes: 58 additions & 11 deletions pjs/http/metrics.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
((
{
metrics,
metricsCache,
durationCache,
} = pipy.solve('lib/metrics.js'),
) => (

pipy({
_requestTime: null
_request: null,
_requestTime: null,
_requestSize: 0,
_metrics: null,
})

.import({
__service: 'service'
__domain: 'route',
__route: 'route',
__service: 'service',
__target: 'connect-tcp',
__consumer: 'consumer',
})

.pipeline()
.handleMessageStart(
() => (
_requestTime = Date.now()
(msg) => (
_request = msg,
// add HTTP header size
_requestTime = Date.now(),
metrics.fgwHttpRequestsTotal.increase()
)
)
.handleData(
data => (
_requestSize += data.size
)
)
.chain()
Expand All @@ -26,21 +42,52 @@ pipy({
serviceName = __service?.name,
status = msg?.head?.status,
statusClass = Math.floor(status / 100),
metrics = metricsCache.get(serviceName),
durationHist = durationCache.get(serviceName),
) => (
durationHist && durationHist.observe(Date.now() - _requestTime),
metrics && (
metrics.upstreamCompletedCount.increase(),
metrics.upstreamResponseTotal.increase(),
_metrics = metricsCache.get(serviceName),
_metrics && (

_metrics.upstreamCompletedCount.increase(),
_metrics.upstreamResponseTotal.increase(),
status && (
metrics.upstreamCodeCount.withLabels(status).increase(),
metrics.upstreamCodeXCount.withLabels(statusClass).increase(),
metrics.upstreamResponseCode.withLabels(statusClass).increase()

_metrics.fgwHttpStatus.withLabels(
status,
__route?.config?.route || '',
__route?.config?.Path?.Path || '',
__domain?.name || '',
__consumer?.name || '',
__target || '',
(_request?.head?.path || '').split('?')[0]
).increase(),

_metrics.fgwBandwidth.withLabels(
'egress',
__route?.config?.route || '',
__consumer?.name || '',
__inbound.remoteAddress || ''
).increase(_requestSize),
_requestSize = 0,
// add HTTP header size

_metrics.upstreamCodeCount.withLabels(status).increase(),
_metrics.upstreamCodeXCount.withLabels(statusClass).increase(),
_metrics.upstreamResponseCode.withLabels(statusClass).increase()
)
)
)
)()
)
.handleData(
data => (
_metrics && _metrics.fgwBandwidth.withLabels(
'ingress',
__route?.config?.route || '',
__consumer?.name || '',
__inbound.remoteAddress || ''
).increase(data.size)
)
)

))()
4 changes: 2 additions & 2 deletions pjs/lib/connect-tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pipy({
.export('connect-tcp', {
__target: null,
__metricLabel: null,
__upstream: {},
__upstream: null,
})

.pipeline()
Expand Down Expand Up @@ -44,7 +44,7 @@ pipy({
)
.connect(() => __target)
.handleStreamEnd(
e => e.error && (__upstream.error = e.error)
e => e.error && (__upstream = {error: e.error})
)
.handleData(
data => (
Expand Down
30 changes: 30 additions & 0 deletions pjs/lib/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@
pod,
} = pipy.solve('lib/utils.js'),

fgwHttpStatus = new stats.Counter('fgw_http_status', [
'service', 'code', 'route', 'matched_uri', 'matched_host', 'consumer', 'node', 'path'
]),

fgwBandwidth = new stats.Counter('fgw_bandwidth', [
'service', 'type', 'route', 'consumer', 'node'
]),

fgwHttpRequestsTotal = new stats.Gauge('fgw_http_requests_total'),

fgwHttpCurrentConnections = new stats.Gauge('fgw_http_current_connections', [
'state'
]),

fgwUpstreamStatus = new stats.Gauge('fgw_upstream_status', [
'name', 'ip', 'port'
]),

sendBytesTotalCounter = new stats.Counter('fgw_service_upstream_cx_tx_bytes_total', [
'fgw_service_name'
]),
Expand Down Expand Up @@ -79,8 +97,19 @@
'fgw_service_name'
]),

metrics = {
fgwHttpRequestsTotal,
fgwHttpCurrentConnections,
fgwUpstreamStatus,
},

metricsCache = new algo.Cache(serviceName => (
{
fgwHttpStatus: fgwHttpStatus.withLabels(serviceName),
fgwBandwidth: fgwBandwidth.withLabels(serviceName),
fgwHttpRequestsTotal,
fgwHttpCurrentConnections,

sendBytesTotalCounter: sendBytesTotalCounter.withLabels(serviceName),
receiveBytesTotalCounter: receiveBytesTotalCounter.withLabels(serviceName),
activeConnectionGauge: activeConnectionGauge.withLabels(serviceName),
Expand Down Expand Up @@ -129,6 +158,7 @@
),

{
metrics,
metricsCache,
durationCache,
rateLimitCounter: new stats.Counter('http_local_rate_limiter', [
Expand Down

0 comments on commit 05e90b9

Please sign in to comment.