Skip to content

Commit

Permalink
Merge pull request #7 from microservice-framework/2.x-issue-6
Browse files Browse the repository at this point in the history
2.x issue #6 working minimal `pre`
  • Loading branch information
Gormartsen authored Mar 10, 2019
2 parents 8280fdd + 351731c commit 9bdb0c5
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 23 deletions.
10 changes: 5 additions & 5 deletions router-admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,18 @@ function adminPOST(jsonData, requestDetails, callback) {
*/
function adminSearch(jsonData, requestDetails, callback) {
// Version 1.x compatibility.
if(jsonData.query) {
if(!jsonData.query.type) {
if (jsonData.query) {
if (!jsonData.query.type) {
jsonData.query.type = { $eq: 'handler'}
}
if(!jsonData.query.online) {
if (!jsonData.query.online) {
jsonData.query.online = true
}
} else {
if(!jsonData.type) {
if (!jsonData.type) {
jsonData.type = { $eq: 'handler'}
}
if(!jsonData.online) {
if (!jsonData.online) {
jsonData.online = true
}
}
Expand Down
140 changes: 125 additions & 15 deletions router-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ const ExplorerClass = require('./includes/explorerClass.js');

var debug = {
log: debugF('proxy:log'),
debug: debugF('proxy:debug')
debug: debugF('proxy:debug'),
debugMetric: debugF('proxy:metric'),
debugHook: debugF('proxy:hook')
};


Expand Down Expand Up @@ -263,7 +265,15 @@ function checkConditions(conditions, requestDetails, jsonData) {
function matchRoute(targetRequest, routeItem) {
let routeItems = targetRequest.route.split('/');


if(routeItem.type == "metric") {
if (routeItem.conditions) {
if (!checkConditions(routeItem.conditions,
targetRequest.requestDetails, targetRequest.jsonData)) {
return false
}
}
return true
}
if (routeItem.path && routeItem.path.length == 1 && routeItem.path[0] == '*') {
if (routeItem.conditions) {
if (!checkConditions(routeItem.conditions,
Expand Down Expand Up @@ -387,10 +397,10 @@ function hookCall(targetRequest, phase, callback) {
debug.log('broadcast sent');
// If more in queue left - send more
if (broadcastTargets.length) {
_request(getBroadcastRequest, callbackBroadcastRequest)
_request(getBroadcastRequest, callbackBroadcastRequest, targetRequest)
}
}
_request(getBroadcastRequest, callbackBroadcastRequest)
_request(getBroadcastRequest, callbackBroadcastRequest, targetRequest)
}

// send Notify
Expand Down Expand Up @@ -452,10 +462,10 @@ function hookCall(targetRequest, phase, callback) {
// If more groups left - send more
if (notifyGroups.length) {
currentNotifyGroup = notifyGroups.shift()
_request(getNotifyRequest, callbackNotifyRequest)
_request(getNotifyRequest, callbackNotifyRequest, targetRequest)
}
}
_request(getNotifyRequest, callbackNotifyRequest)
_request(getNotifyRequest, callbackNotifyRequest, targetRequest)
}
}

Expand Down Expand Up @@ -571,20 +581,20 @@ function hookCall(targetRequest, phase, callback) {
// If more groups left - send more
if (adapterGroups.length) {
currentAdapterGroup = adapterGroups.shift()
return _request(getAdapterRequest, callbackAdapterRequest)
return _request(getAdapterRequest, callbackAdapterRequest, targetRequest)
}
// return back via callback
callback()
}
_request(getAdapterRequest, callbackAdapterRequest)
_request(getAdapterRequest, callbackAdapterRequest, targetRequest)

}

/**
* Find all hook routes by stage.
*/
function findHookTarget(targetRequest, phase, type, group){
debug.debug('Find all hooks route: %s phase: %s type: %s group: %s',
debug.debugHook('Find all hooks route: %s phase: %s type: %s group: %s',
targetRequest.route, phase, type, group);
let allHookTargets = findAllTargets(targetRequest, 'hook')
if (allHookTargets instanceof Error) {
Expand All @@ -597,7 +607,7 @@ function findHookTarget(targetRequest, phase, type, group){
continue
}
for (let hook of target.hook) {
if (hook.phase !== phase) {
if (phase !== null && hook.phase !== phase) {
continue
}
if (hook.type !== type) {
Expand All @@ -622,6 +632,7 @@ function findHookTarget(targetRequest, phase, type, group){
if (!finalHookTable.length) {
debug.debug('Not found for %s', targetRequest.route);
debug.log('Hook instance %s not found', group);
debug.debugHook('Hook instance %s not found', group);
return new Error('Hook instance not found');
}
return finalHookTable
Expand Down Expand Up @@ -667,7 +678,7 @@ function getMinLoadedRouter(availableRoutes) {
cpu:0,
loadavg: 0
}
if(minRouter.metrics) {
if (minRouter.metrics) {
totalCPU = minRouter.metrics.reduce(function(a, b) {

return {
Expand All @@ -683,7 +694,7 @@ function getMinLoadedRouter(availableRoutes) {
cpu:0,
loadavg: 0
}
if(availableRoutes[i].metrics) {
if (availableRoutes[i].metrics) {
totalCPU = availableRoutes[i].metrics.reduce(function(a, b) {
return {
cpu : parseFloat(a.cpu) + parseFloat(b.cpu) + a.loadavg[0] + b.loadavg[0],
Expand All @@ -701,7 +712,7 @@ function getMinLoadedRouter(availableRoutes) {
return minRouter;
}

function _request(getRequest, callback) {
function _request(getRequest, callback, targetRequest, noMetric) {
let requestOptions = getRequest()

if (requestOptions instanceof Error) {
Expand All @@ -716,12 +727,111 @@ function _request(getRequest, callback) {
if (!(uri.host || (uri.hostname && uri.port)) && !uri.isUnix) {
return callback(new Error('Invalid URI' + requestOptions.uri))
}

let getHeaders = function(router, hookType){
let headers = {};
// TODO verify date,content-type, transfer-encoding headers
let skipHeaders = [
'host',
'content-type',
'date',
'connection',
'content-length',
'transfer-encoding'
]
for (var i in targetRequest.requestDetails.headers) {
if (skipHeaders.indexOf(i) != -1) {
continue
}
headers[i] = targetRequest.requestDetails.headers[i];
}
for (var i in router.matchVariables) {
headers['mfw-' + i] = router.matchVariables[i];
}
headers['x-origin-url'] = targetRequest.route
headers['x-origin-method'] = targetRequest.method
headers['x-endpoint-scope'] = targetRequest.endpoint.scope
debug.debug('%s headers %O', targetRequest.route, headers);
return headers;
}
let startTime = Date.now();
request(requestOptions, function(error, response, body) {
let endTime = Date.now();
debug.debugMetric('requestOptions: %O time: %s', requestOptions, endTime - startTime);
if (!noMetric) {
let metricTargets = findAllTargets(targetRequest, 'metric')
debug.debugMetric('findHookTarget: for %s result: %O', targetRequest.route, metricTargets);

if (metricTargets instanceof Array) {
let getMetricRequest = function(){
if (metricTargets instanceof Error) {
return metricTargets
}
if (!metricTargets.length) {
return false
}
let router = metricTargets.pop()
debug.log('Metric route %s result %O', targetRequest.route, router);

let statusCode = 0
if(error) {
statusCode = error.code
} else {
if(response.statusCode) {
statusCode = response.statusCode
}
}

let metricJSON = {
startTime: startTime,
endTime: endTime,
code: statusCode,
method: requestOptions.method,
headers: requestOptions.headers,
uri: requestOptions.uri,
route: targetRequest.route,
}
if (!router.meta) {
metricJSON.request = targetRequest.requestDetails._buffer;
metricJSON.response = body;
}
let metricBody = JSON.stringify(metricJSON)
let headers = getHeaders(router, 'metric')
// Sign request for hook
headers['x-hook-signature'] = 'sha256='
+ signature('sha256', metricBody, router.secureKey);
return {
uri: router.url + targetRequest.path,
method: 'NOTIFY',
headers: headers,
body: metricBody
}
}
let callbackMetricRequest = function(err, response, body){
// No action on broadcast hook.
if (err) {
debug.log('metric failed %O', err);
}
debug.log('metric sent');
debug.debugMetric('Metric targetRequest %O ', targetRequest);
// If more in queue left - send more
if (metricTargets.length) {
_request(getMetricRequest, callbackMetricRequest, targetRequest, true)
}
}
_request(getMetricRequest, callbackMetricRequest, targetRequest, true)
} else {
debug.debugMetric('no metric enpoints');
}
} else {
debug.debugMetric('metric disabled');
}

if (error) {
// TODO add limit to re send
debug.debug('_request Error received: %O', error);
debug.debug('_request Restart request: %O', requestOptions);
return _request(getRequest, callback);
return _request(getRequest, callback, targetRequest);
}

debug.debug('%s body: %s', requestOptions.uri, body);
Expand Down Expand Up @@ -880,7 +990,7 @@ function proxyRequest(route, path, method, jsonData, requestDetails, callback) {

})
}
_request(getEndpointRequest, callbackEndpointRequest)
_request(getEndpointRequest, callbackEndpointRequest, targetRequest)

})
}
Expand Down
5 changes: 2 additions & 3 deletions schema/service.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"type": {
"type": "string",
"enum": ["adapter", "notify", "broadcast"],
"enum": ["adapter", "notify", "broadcast", "metric"],
"required": true
},
"group": {
Expand Down Expand Up @@ -107,8 +107,7 @@
"items": {
"type": "string"
},
"minItems": 1,
"required": true
"minItems": 1
},
"url": {
"type": "string",
Expand Down

0 comments on commit 9bdb0c5

Please sign in to comment.