Skip to content

Commit

Permalink
支持LeastWorkLoadBalancer等其他负载均衡算法 (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanpf authored Aug 31, 2023
1 parent 1d472e2 commit d2da1b9
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 31 deletions.
11 changes: 6 additions & 5 deletions pjs/common/health-check.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
maxFails = serviceConfig.HealthCheck?.MaxFails, // || 3, both
failTimeout = serviceConfig.HealthCheck?.FailTimeout, // || 300, passivity
uri = serviceConfig.HealthCheck?.Uri, // for HTTP
matches = serviceConfig.HealthCheck?.Matches || [{ Type: "status", Value: "200" }], // for HTTP
matches = serviceConfig.HealthCheck?.Matches || [{ Type: "status", Value: [ 200 ] }], // for HTTP
type = uri ? 'HTTP' : 'TCP',
) => (
{
Expand All @@ -38,7 +38,7 @@
healthCheckServices[name].remove(target.target)
),
isDebugEnabled && (
console.log('[health-check] ok - service, type, target:', name, type, target.target)
console.log('[health-check] ok - service, type, target:', name, type, target)
)
),
metrics.fgwUpstreamStatus.withLabels(
Expand All @@ -61,7 +61,7 @@
)
),
isDebugEnabled && (
console.log('[health-check] fail - service, type, target:', name, type, target.target)
console.log('[health-check] fail - service, type, target:', name, type, target)
)
),
metrics.fgwUpstreamStatus.withLabels(
Expand All @@ -81,7 +81,7 @@
m => (
(m?.Type === 'status') ? (
msg => (
msg?.head?.status == m?.Value
m?.Value?.includes(msg?.head?.status)
)
) : (
(m?.Type === 'body') ? (
Expand Down Expand Up @@ -111,7 +111,8 @@
target.service.match(result) ? (
target.service.ok(target)
) : (
target.service.fail(target)
target.service.fail(target),
target.reason = "status " + result?.head?.status
),
{}
)
Expand Down
39 changes: 33 additions & 6 deletions pjs/config.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
{
"Configs": {
"EnableDebug": true
"EnableDebug": true,
"ShowUpstreamStatusInResponseHeader": true
},
"Consumers": [
{
"name": "test01",
"password": "123456",
"Headers-Authorization": {
"authorization": "Basic dGVzdDoxMjM0NTY="
}
}
],
"Listeners": [
{
"Protocol": "HTTP",
"Port": 8080
"Port": 8080,
"AccessControlLists": {
"blacklist": [
"127.0.0.11",
"192.168.122.1/32"
],
"whitelist": [
"192.168.122.18",
"127.0.0.1/32"
]
}
},
{
"Protocol": "HTTP",
Expand All @@ -22,6 +42,7 @@
"RouteType": "HTTP",
"Matches": [
{
"route": "route-id-001",
"Path": {
"Type": "Prefix",
"Path": "/"
Expand All @@ -37,7 +58,7 @@
"*": {
"Matches": [
{
"ServerRoot": "/var/www/html",
"ServerRoot": "www1",
"Index": [
"index.html",
"index.htm"
Expand Down Expand Up @@ -77,16 +98,23 @@
"Matches": [
{
"Type": "status",
"Value": "200"
"Value": [
200,
201
]
}
]
},
"Algorithm": "LeastWorkLoadBalancer",
"Endpoints": {
"127.0.0.1:8081": {
"Weight": 50
},
"127.0.0.1:8082": {
"Weight": 50
},
"127.0.0.1:8083": {
"Weight": 50
}
}
}
Expand Down Expand Up @@ -152,6 +180,5 @@
"tcp/forward.js"
]
},

"Version": "0"
}
}
32 changes: 21 additions & 11 deletions pjs/http/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
serviceConfig && (
(
endpointAttributes = {},
obj = {
targetBalancer: serviceConfig.Endpoints && new algo.RoundRobinLoadBalancer(
shuffle(Object.fromEntries(Object.entries(serviceConfig.Endpoints)
endpoints = shuffle(
Object.fromEntries(
Object.entries(serviceConfig.Endpoints)
.map(([k, v]) => (endpointAttributes[k] = v, v.hash = algo.hash(k), [k, v.Weight]))
.filter(([k, v]) => v > 0)
))
.filter(([k, v]) => (serviceConfig.Algorithm !== 'RoundRobinLoadBalancer' || v > 0))
)
),
obj = {
targetBalancer: serviceConfig.Endpoints && (
(serviceConfig.Algorithm === 'HashingLoadBalancer') ? (
new algo.HashingLoadBalancer(Object.keys(endpoints))
) : (
new algo[serviceConfig.Algorithm || 'RoundRobinLoadBalancer'](endpoints)
)
),
endpointAttributes,
...(serviceConfig.StickyCookieName && ({
Expand Down Expand Up @@ -135,6 +143,7 @@
_isRetry: false,
_unhealthCache: null,
_healthCheckTarget: null,
_targetResource: null,
})

.import({
Expand Down Expand Up @@ -199,7 +208,9 @@
_cookieId ? (
__target = _cookieId
) : (
__target = _targetBalancer?.borrow?.({}, undefined, _unhealthCache)?.id
(_targetResource = _targetBalancer?.borrow?.(undefined, undefined, _unhealthCache)) && (
__target = _targetResource?.id
)
),
__target
) && (
Expand Down Expand Up @@ -252,7 +263,7 @@
)
),
(
$=>$.muxHTTP(() => (__service.name + __target), () => _muxHttpOptions).to(
$=>$.muxHTTP(() => _targetResource, () => _muxHttpOptions).to(
$=>$.branch(
() => __cert, (
$=>$.use('lib/connect-tls.js')
Expand All @@ -273,10 +284,9 @@
)
),
(_healthCheckTarget = __healthCheckTargets?.[__target + '@' + __service.name]) && (
(!msg?.head?.status || (msg?.head?.status > 499)) ? (
_healthCheckTarget.service.fail(_healthCheckTarget)
) : (
_healthCheckTarget.service.ok(_healthCheckTarget)
(__upstream?.error === 'ConnectionRefused') && (
_healthCheckTarget.service.fail(_healthCheckTarget),
_healthCheckTarget.reason = 'ConnectionRefused'
)
)
)
Expand Down
7 changes: 3 additions & 4 deletions pjs/lib/connect-tcp.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
((
{ isDebugEnabled } = pipy.solve('config.js'),
{
metricsCache,
} = pipy.solve('lib/metrics.js'),
{ metrics, metricsCache } = pipy.solve('lib/metrics.js'),
) => (

pipy({
Expand All @@ -19,7 +17,8 @@ pipy({
.onStart(
() => void (
_metrics = metricsCache.get(__metricLabel),
_metrics.activeConnectionGauge.increase()
_metrics.activeConnectionGauge.increase(),
metrics.fgwStreamConnectionTotal.withLabels(__metricLabel).increase()
)
)
.onEnd(
Expand Down
5 changes: 5 additions & 0 deletions pjs/lib/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@
'service'
]),

fgwStreamConnectionTotal = new stats.Counter('fgw_stream_connection_total', [
'route'
]),

metrics = {
fgwHttpRequestsTotal, // codec.js
fgwHttpCurrentConnections, // codec.js
fgwUpstreamStatus, // health-check.js
fgwStreamConnectionTotal, // connect-tcp.js
},

metricsCache = new algo.Cache(serviceName => (
Expand Down
18 changes: 13 additions & 5 deletions pjs/tcp/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@
serviceConfig && (
(
endpointAttributes = {},
endpoints = shuffle(
Object.fromEntries(
Object.entries(serviceConfig.Endpoints)
.map(([k, v]) => (endpointAttributes[k] = v, v.hash = algo.hash(k), [k, v.Weight]))
.filter(([k, v]) => (serviceConfig.Algorithm !== 'RoundRobinLoadBalancer' || v > 0))
)
),
obj = {
targetBalancer: serviceConfig.Endpoints && new algo.RoundRobinLoadBalancer(
shuffle(Object.fromEntries(Object.entries(serviceConfig.Endpoints)
.map(([k, v]) => (endpointAttributes[k] = v, [k, v.Weight]))
.filter(([k, v]) => v > 0)
))
targetBalancer: serviceConfig.Endpoints && (
(serviceConfig.Algorithm === 'HashingLoadBalancer') ? (
new algo.HashingLoadBalancer(Object.keys(endpoints))
) : (
new algo[serviceConfig.Algorithm || 'RoundRobinLoadBalancer'](endpoints)
)
),
endpointAttributes,
failoverBalancer: serviceConfig.Endpoints && failover(Object.fromEntries(Object.entries(serviceConfig.Endpoints).map(([k, v]) => [k, v.Weight]))),
Expand Down

0 comments on commit d2da1b9

Please sign in to comment.