Skip to content

Commit

Permalink
build(deps): bump to latest fgw scripts (#315)
Browse files Browse the repository at this point in the history
Signed-off-by: Lin Yang <[email protected]>
  • Loading branch information
reaver-flomesh authored Jul 25, 2024
1 parent 997f242 commit a6deec8
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 44 deletions.
Binary file modified charts/fsm/components/scripts.tar.gz
Binary file not shown.
43 changes: 26 additions & 17 deletions charts/fsm/components/scripts/gateways/modules/backend-selector.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
export default function (config, protocol, rule, makeForwarder) {
var ruleFilters = makeFilters(rule?.filters)
var listenerFilterCaches = new algo.Cache(
protocol => new algo.Cache(
listener => makeFilters(protocol, listener?.filters)
)
)

export default function (config, protocol, listener, rule, makeForwarder) {
var ruleFilters = [
...listenerFilterCaches.get(protocol).get(listener),
...makeFilters(protocol, rule?.filters),
]

var refs = rule?.backendRefs || []
if (refs.length > 1) {
Expand All @@ -20,7 +29,7 @@ export default function (config, protocol, rule, makeForwarder) {
var backendResource = findBackendResource(backendRef)
var filters = [
...ruleFilters,
...makeFilters(backendRef?.filters),
...makeFilters(protocol, backendRef?.filters),
]
return {
id: backendRef?.name,
Expand All @@ -40,21 +49,21 @@ export default function (config, protocol, rule, makeForwarder) {
)
}
}
}

function makeFilters(filters) {
if (!filters) return []
return filters.map(
config => {
var maker = (
importFilter(`../config/filters/${protocol}/${config.type}.js`) ||
importFilter(`../filters/${protocol}/${config.type}.js`)
)
if (!maker) throw `${protocol} filter not found: ${config.type}`
if (typeof maker !== 'function') throw `filter ${config.type} is not a function`
return maker(config)
}
)
}
function makeFilters(protocol, filters) {
if (!filters) return []
return filters.map(
config => {
var maker = (
importFilter(`../config/filters/${protocol}/${config.type}.js`) ||
importFilter(`../filters/${protocol}/${config.type}.js`)
)
if (!maker) throw `${protocol} filter not found: ${config.type}`
if (typeof maker !== 'function') throw `filter ${config.type} is not a function`
return maker(config)
}
)
}

function importFilter(pathname) {
Expand Down
90 changes: 72 additions & 18 deletions charts/fsm/components/scripts/gateways/modules/forward-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import makeSessionPersistence from './session-persistence.js'
import { stringifyHTTPHeaders, findPolicies } from '../utils.js'
import { log } from '../log.js'

var backends = {}

var $ctx
var $selection
var $session

export default function (config, backendRef, backendResource, isHTTP2) {
var name = backendResource.metadata.name
var hc = makeHealthCheck(config, backendRef, backendResource)
var tls = makeBackendTLS(config, backendRef, backendResource)

Expand All @@ -18,6 +21,19 @@ export default function (config, backendRef, backendResource, isHTTP2) {
return { address, weight }
})

var backend = (backends[name] ??= {
name,
concurrency: 0,
targets: Object.fromEntries(
targets.map(({ address, weight }) => [
address, {
weight,
concurrency: 0,
}
])
)
})

var loadBalancer = new algo.LoadBalancer(
targets, {
key: t => t.address,
Expand All @@ -35,30 +51,41 @@ export default function (config, backendRef, backendResource, isHTTP2) {
if (sessionPersistence) {
var restoreSession = sessionPersistence.restore
var targetSelector = function (req) {
$selection = loadBalancer.allocate(
$session = loadBalancer.allocate(
restoreSession(req.head),
target => hc.isHealthy(target.address)
)
}
} else {
var targetSelector = function () {
$selection = loadBalancer.allocate(null, target => hc.isHealthy(target.address))
$session = loadBalancer.allocate(null, target => hc.isHealthy(target.address))
}
}

return pipeline($=>{
$.onStart(c => void ($ctx = c))
$.onStart(c => { $ctx = c })
$.pipe(evt => {
if (evt instanceof MessageStart) {
$ctx.backend = backend
targetSelector(evt)
log?.(
`Inb #${$ctx.parent.inbound.id} Req #${$ctx.id}`, evt.head.method, evt.head.path,
`forward ${$selection?.target?.address}`,
`forward ${$session?.target?.address}`,
`headers ${stringifyHTTPHeaders(evt.head.headers)}`,
)
return $selection ? forward : reject
return $session ? forward : reject
}
})
$.handleMessageStart(res => {
var r = $ctx.response
r.head = res.head
r.headTime = Date.now()
})
$.handleMessageEnd(res => {
var r = $ctx.response
r.tail = res.tail
r.tailTime = res.tailTime
})

if (log) {
$.handleMessageStart(
Expand All @@ -77,7 +104,11 @@ export default function (config, backendRef, backendResource, isHTTP2) {
)

var forward = pipeline($=>{
$.muxHTTP(() => $selection, { version: isHTTP2 ? 2 : 1 }).to($=>{
$.onStart(() => {
$ctx.sendTime = Date.now()
$ctx.target = $session.target.address
})
$.muxHTTP(() => $session, { version: isHTTP2 ? 2 : 1 }).to($=>{
if (tls) {
$.connectTLS({
...tls,
Expand All @@ -95,11 +126,11 @@ export default function (config, backendRef, backendResource, isHTTP2) {
if (sessionPersistence) {
var preserveSession = sessionPersistence.preserve
$.handleMessageStart(
res => preserveSession(res.head, $selection.target.address)
res => preserveSession(res.head, $session.target.address)
)
}

$.onEnd(() => $selection.free())
$.onEnd(() => $session.free())
})

if (retryConfig) {
Expand All @@ -123,26 +154,49 @@ export default function (config, backendRef, backendResource, isHTTP2) {
}
}).to($=>$
.pipe(forward)
.replaceMessageStart(
function (msg) {
if (retryCodes[msg.head.status]) {
.pipe(evt => {
if (evt instanceof MessageStart) {
var needRetry = (evt.head.status in retryCodes)
var wasRetry = ($retryCounter > 0)
if (wasRetry) {
$ctx.retries.push({
target: $ctx.target,
succeeded: !needRetry,
})
}
if (needRetry) {
if (++$retryCounter < retryConfig.numRetries) {
log?.(`Inb #${$ctx.parent.inbound.id} Req #${$ctx.id} retry ${$retryCounter} status ${msg.head.status}`)
return new StreamEnd
log?.(`Inb #${$ctx.parent.inbound.id} Req #${$ctx.id} retry ${$retryCounter} status ${evt.head.status}`)
return 'retry'
} else {
log?.(`Inb #${$ctx.parent.inbound.id} Req #${$ctx.id} retry ${$retryCounter} status ${msg.head.status} gave up`)
log?.(`Inb #${$ctx.parent.inbound.id} Req #${$ctx.id} retry ${$retryCounter} status ${evt.head.status} gave up`)
$retryCounter = 0
return 'conclude'
}
} else {
return 'conclude'
}
return msg
}
)
}, {
'retry': $=>$.replaceData().replaceMessage(new StreamEnd),
'conclude': $=>$,
})
)
)
}

function connect($) {
$.connect(() => $selection.target.address)
$.onStart(() => {
var t = backends[$session.target.address]
if (t) t.concurrency++
backend.concurrency++
})
$.connect(() => $session.target.address)
$.onEnd(() => {
var t = backends[$session.target.address]
if (t) t.concurrency--
backend.concurrency--
})
}
})
}
28 changes: 22 additions & 6 deletions charts/fsm/components/scripts/gateways/modules/route-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { stringifyHTTPHeaders } from '../utils.js'
import { log } from '../log.js'

var $ctx
var $hostname
var $basePath
var $matchedRoute
var $matchedRule
var $selection

export default function (config, listener, routeResources) {
Expand Down Expand Up @@ -51,6 +54,7 @@ export default function (config, listener, routeResources) {
)
if (selector) $selection = selector(head)
}
$hostname = host
log?.(
`Inb #${$ctx.inbound.id} Req #${$ctx.messageCount+1}`, head.method, head.path,
`backend ${$selection?.target?.backendRef?.name}`,
Expand All @@ -60,12 +64,14 @@ export default function (config, listener, routeResources) {

function makeRuleSelector(routeResources) {
var kind = routeResources[0].kind
var rules = routeResources.flatMap(r => r.spec.rules).map(r => [r, makeBackendSelectorForRule(r, kind === 'GRPCRoute')])
var matches = rules.flatMap(([rule, backendSelector]) => {
var rules = routeResources.flatMap(resource => resource.spec.rules.map(
r => [r, makeBackendSelectorForRule(r, kind === 'GRPCRoute'), resource]
))
var matches = rules.flatMap(([rule, backendSelector, resource]) => {
if (rule.matches) {
return rule.matches.map(m => [m, backendSelector])
return rule.matches.map(m => [m, backendSelector, resource, rule])
} else {
return [[{}, backendSelector]]
return [[{}, backendSelector, resource, rule]]
}
})

Expand Down Expand Up @@ -105,7 +111,7 @@ export default function (config, listener, routeResources) {

switch (kind) {
case 'HTTPRoute':
matches = matches.map(([m, backendSelector], i) => {
matches = matches.map(([m, backendSelector, resource, rule], i) => {
var matchMethod = makeMethodMatcher(m.method)
var matchPath = makePathMatcher(m.path)
var matchHeaders = makeObjectMatcher(m.headers)
Expand All @@ -115,6 +121,8 @@ export default function (config, listener, routeResources) {
if (matchPath && !matchPath(head.path)) return false
if (matchHeaders && !matchHeaders(head.headers)) return false
if (matchParams && !matchParams(new URL(head.path).searchParams.toObject())) return false
$matchedRoute = resource
$matchedRule = rule
return true
}
return [matchFunc, backendSelector]
Expand Down Expand Up @@ -213,7 +221,7 @@ export default function (config, listener, routeResources) {
var sessionPersistenceConfig = rule.sessionPersistence
var sessionPersistence = sessionPersistenceConfig && makeSessionPersistence(sessionPersistenceConfig)
var selector = makeBackendSelector(
config, 'http', rule,
config, 'http', listener, rule,
function (backendRef, backendResource, filters) {
if (!backendResource) return response500
var forwarder = makeForwarder(config, backendRef, backendResource, isHTTP2)
Expand Down Expand Up @@ -250,18 +258,26 @@ export default function (config, listener, routeResources) {
$ctx = {
parent: $ctx,
id: ++$ctx.messageCount,
host: $hostname,
path: msg.head.path,
head: msg.head,
headTime: Date.now(),
tail: null,
tailTime: 0,
sendTime: 0,
response: {
head: null,
headTime: 0,
tail: null,
tailTime: 0,
},
basePath: $basePath,
routeResource: $matchedRoute,
routeRule: $matchedRule,
backendResource: $selection?.target?.backendResource,
backend: null,
target: '',
retries: [],
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default function (config, listener, routeResources) {
var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd))

var selector = makeBackendSelector(
config, 'tcp',
config, 'tcp', listener,
routeResources[0]?.spec?.rules?.[0],
function (backendRef, backendResource, filters) {
var forwarder = backendResource ? makeForwarder(config, backendRef, backendResource) : shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default function (config, listener, routeResources) {
var hostnames = r.spec.hostnames || ['*']
hostnames.forEach(name => {
var selector = makeBackendSelector(
config, 'tcp', r.spec.rules?.[0],
config, 'tcp', listener, r.spec.rules?.[0],
function (backendRef, backendResource, filters) {
var forwarder = backendResource ? makeForwarder(config, backendRef, backendResource) : shutdown
return pipeline($=>$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default function (config, listener, routeResources) {
var shutdown = pipeline($=>$.replaceStreamStart(new StreamEnd))

var selector = makeBackendSelector(
config, 'udp',
config, 'udp', listener,
routeResources[0]?.spec?.rules?.[0],
function (backendRef, backendResource, filters) {
var forwarder = backendResource ? makeForwarder(config, backendRef, backendResource) : shutdown
Expand Down

0 comments on commit a6deec8

Please sign in to comment.