Skip to content

Commit

Permalink
增加LeastConnectionLoadBalancer负载均衡策略(config.json里面配置使用) (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanpf authored Aug 31, 2023
1 parent d2da1b9 commit b669e13
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 12 deletions.
17 changes: 7 additions & 10 deletions pjs/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
"Protocol": "HTTP",
"Port": 8080,
"AccessControlLists": {
"blacklist": [
"127.0.0.11",
"192.168.122.1/32"
],
"whitelist": [
"192.168.122.18",
"127.0.0.1/32"
]
"blacklist": [],
"whitelist": []
}
},
{
Expand Down Expand Up @@ -93,7 +87,6 @@
"HealthCheck": {
"Interval": 10,
"MaxFails": 3,
"FailTimeout": 30,
"Uri": "/",
"Matches": [
{
Expand All @@ -105,7 +98,7 @@
}
]
},
"Algorithm": "LeastWorkLoadBalancer",
"Algorithm": "LeastConnectionLoadBalancer",
"Endpoints": {
"127.0.0.1:8081": {
"Weight": 50
Expand All @@ -131,6 +124,8 @@
"http/metrics.js",
"http/tracing.js",
"http/logging.js",
"http/access-control-domain.js",
"http/access-control-route.js",
"http/circuit-breaker.js",
"http/throttle-domain.js",
"http/throttle-route.js",
Expand All @@ -152,6 +147,8 @@
"http/metrics.js",
"http/tracing.js",
"http/logging.js",
"http/access-control-domain.js",
"http/access-control-route.js",
"http/circuit-breaker.js",
"http/throttle-domain.js",
"http/throttle-route.js",
Expand Down
101 changes: 101 additions & 0 deletions pjs/http/access-control-domain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
((
{ isDebugEnabled } = pipy.solve('config.js'),

{ aclCounter } = pipy.solve('lib/metrics.js'),
aclDomainCounter = aclCounter.withLabels('domain'),

parseIpList = ipList => (
(ips, ipRanges) => (
(ipList || []).forEach(
o => (
o.indexOf('/') > 0 ? (
!ipRanges && (ipRanges = []),
ipRanges.push(new Netmask(o))
) : (
!ips && (ips = {}),
ips[o] = true
)
)
),
(ips || ipRanges) ? ({ ips, ipRanges }) : undefined
)
)(),

aclsCache = new algo.Cache(
acls => (
{
blackList: parseIpList(acls?.blacklist),
whiteList: parseIpList(acls?.whitelist),
}
)
),

checkACLs = ip => (
(
acls = aclsCache.get(__domain?.AccessControlLists),
white = acls?.whiteList,
black = acls?.blackList,
blackMode = true,
block = false,
pass = false,
) => (
white && (
blackMode = false,
(white?.ips?.[ip] && (pass = true)) || (
pass = white?.ipRanges?.find?.(r => r.contains(ip))
)
),
blackMode && (
(black?.ips?.[ip] && (block = true)) || (
block = black?.ipRanges?.find?.(r => r.contains(ip))
)
),
blackMode ? Boolean(!block) : Boolean(pass)
)
)(),
) => pipy({
_ips: null,
_pass: false,
})

.import({
__domain: 'route',
})

.pipeline()
.handleMessageStart(
msg => (
__domain?.AccessControlLists?.enableXFF && (
_ips = msg.head?.headers['x-forwarded-for']
),
_ips ? (
_pass = _ips.split(',').every(ip => checkACLs(ip.trim()))
) : (
_pass = checkACLs(__inbound.remoteAddress)
)
)
)
.branch(
() => _pass, (
$=>$.chain()
), (
$=>$
.branch(
isDebugEnabled, (
$=>$.handleStreamStart(
() => (
console.log('[access-control-domain] blocked XFF, IP address:', _ips, __inbound.remoteAddress, _pass)
)
)
)
)
.replaceMessage(
() => (
aclDomainCounter.increase(),
new Message({ status: __domain?.AccessControlLists?.status || 403 }, __domain?.AccessControlLists?.message || '')
)
)
)
)

)()
101 changes: 101 additions & 0 deletions pjs/http/access-control-route.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
((
{ isDebugEnabled } = pipy.solve('config.js'),

{ aclCounter } = pipy.solve('lib/metrics.js'),
aclRouteCounter = aclCounter.withLabels('route'),

parseIpList = ipList => (
(ips, ipRanges) => (
(ipList || []).forEach(
o => (
o.indexOf('/') > 0 ? (
!ipRanges && (ipRanges = []),
ipRanges.push(new Netmask(o))
) : (
!ips && (ips = {}),
ips[o] = true
)
)
),
(ips || ipRanges) ? ({ ips, ipRanges }) : undefined
)
)(),

aclsCache = new algo.Cache(
acls => (
{
blackList: parseIpList(acls?.blacklist),
whiteList: parseIpList(acls?.whitelist),
}
)
),

checkACLs = ip => (
(
acls = aclsCache.get(__route?.config?.AccessControlLists),
white = acls?.whiteList,
black = acls?.blackList,
blackMode = true,
block = false,
pass = false,
) => (
white && (
blackMode = false,
(white?.ips?.[ip] && (pass = true)) || (
pass = white?.ipRanges?.find?.(r => r.contains(ip))
)
),
blackMode && (
(black?.ips?.[ip] && (block = true)) || (
block = black?.ipRanges?.find?.(r => r.contains(ip))
)
),
blackMode ? Boolean(!block) : Boolean(pass)
)
)(),
) => pipy({
_ips: null,
_pass: false,
})

.import({
__route: 'route',
})

.pipeline()
.handleMessageStart(
msg => (
__route?.config?.AccessControlLists?.enableXFF && (
_ips = msg.head?.headers['x-forwarded-for']
),
_ips ? (
_pass = _ips.split(',').every(ip => checkACLs(ip.trim()))
) : (
_pass = checkACLs(__inbound.remoteAddress)
)
)
)
.branch(
() => _pass, (
$=>$.chain()
), (
$=>$
.branch(
isDebugEnabled, (
$=>$.handleStreamStart(
() => (
console.log('[access-control-route] blocked XFF, IP address:', _ips, __inbound.remoteAddress, _pass)
)
)
)
)
.replaceMessage(
() => (
aclRouteCounter.increase(),
new Message({ status: __route?.config?.AccessControlLists?.status || 403 }, __route?.config?.AccessControlLists?.message || '')
)
)
)
)

)()
6 changes: 5 additions & 1 deletion pjs/http/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
(serviceConfig.Algorithm === 'HashingLoadBalancer') ? (
new algo.HashingLoadBalancer(Object.keys(endpoints))
) : (
new algo[serviceConfig.Algorithm || 'RoundRobinLoadBalancer'](endpoints)
(serviceConfig.Algorithm === 'LeastConnectionLoadBalancer') ? (
new algo.LeastWorkLoadBalancer(Object.keys(endpoints))
) : (
new algo[serviceConfig.Algorithm || 'RoundRobinLoadBalancer'](endpoints)
)
)
),
endpointAttributes,
Expand Down
3 changes: 3 additions & 0 deletions pjs/lib/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
rateLimitCounter: new stats.Counter('http_local_rate_limiter', [
'http_local_rate_limit'
]),
aclCounter: new stats.Counter('access_control', [
'type'
]),
}
)

Expand Down
6 changes: 5 additions & 1 deletion pjs/tcp/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
(serviceConfig.Algorithm === 'HashingLoadBalancer') ? (
new algo.HashingLoadBalancer(Object.keys(endpoints))
) : (
new algo[serviceConfig.Algorithm || 'RoundRobinLoadBalancer'](endpoints)
(serviceConfig.Algorithm === 'LeastConnectionLoadBalancer') ? (
new algo.LeastWorkLoadBalancer(Object.keys(endpoints))
) : (
new algo[serviceConfig.Algorithm || 'RoundRobinLoadBalancer'](endpoints)
)
)
),
endpointAttributes,
Expand Down

0 comments on commit b669e13

Please sign in to comment.