Skip to content

Commit

Permalink
#6 metric hook support prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
Gormartsen committed Mar 9, 2019
1 parent 8280fdd commit ac881e0
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 11 deletions.
100 changes: 90 additions & 10 deletions router-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,10 @@ function hookCall(targetRequest, phase, callback) {
debug.log('broadcast sent');
// If more in queue left - send more
if (broadcastTargets.length) {
_request(getBroadcastRequest, callbackBroadcastRequest)
_request(getBroadcastRequest, callbackBroadcastRequest, targetRequest)
}
}
_request(getBroadcastRequest, callbackBroadcastRequest)
_request(getBroadcastRequest, callbackBroadcastRequest, targetRequest)
}

// send Notify
Expand Down Expand Up @@ -452,10 +452,10 @@ function hookCall(targetRequest, phase, callback) {
// If more groups left - send more
if (notifyGroups.length) {
currentNotifyGroup = notifyGroups.shift()
_request(getNotifyRequest, callbackNotifyRequest)
_request(getNotifyRequest, callbackNotifyRequest, targetRequest)
}
}
_request(getNotifyRequest, callbackNotifyRequest)
_request(getNotifyRequest, callbackNotifyRequest, targetRequest)
}
}

Expand Down Expand Up @@ -571,12 +571,12 @@ function hookCall(targetRequest, phase, callback) {
// If more groups left - send more
if (adapterGroups.length) {
currentAdapterGroup = adapterGroups.shift()
return _request(getAdapterRequest, callbackAdapterRequest)
return _request(getAdapterRequest, callbackAdapterRequest, targetRequest)
}
// return back via callback
callback()
}
_request(getAdapterRequest, callbackAdapterRequest)
_request(getAdapterRequest, callbackAdapterRequest, targetRequest)

}

Expand All @@ -597,7 +597,7 @@ function findHookTarget(targetRequest, phase, type, group){
continue
}
for (let hook of target.hook) {
if (hook.phase !== phase) {
if (phase !== null && hook.phase !== phase) {
continue
}
if (hook.type !== type) {
Expand Down Expand Up @@ -701,7 +701,7 @@ function getMinLoadedRouter(availableRoutes) {
return minRouter;
}

function _request(getRequest, callback) {
function _request(getRequest, callback, targetRequest, noMetric) {
let requestOptions = getRequest()

if (requestOptions instanceof Error) {
Expand All @@ -716,12 +716,92 @@ function _request(getRequest, callback) {
if (!(uri.host || (uri.hostname && uri.port)) && !uri.isUnix) {
return callback(new Error('Invalid URI' + requestOptions.uri))
}

let getHeaders = function(router, hookType){
let headers = {};
// TODO verify date,content-type, transfer-encoding headers
let skipHeaders = [
'host',
'content-type',
'date',
'connection',
'content-length',
'transfer-encoding'
]
for (var i in targetRequest.requestDetails.headers) {
if (skipHeaders.indexOf(i) != -1) {
continue
}
headers[i] = targetRequest.requestDetails.headers[i];
}
for (var i in router.matchVariables) {
headers['mfw-' + i] = router.matchVariables[i];
}
headers['x-origin-url'] = targetRequest.route
headers['x-origin-method'] = targetRequest.method
headers['x-hook-type'] = hookType
headers['x-endpoint-scope'] = targetRequest.endpoint.scope
debug.debug('%s headers %O', targetRequest.route, headers);
return headers;
}
let startTime = Date.now();
request(requestOptions, function(error, response, body) {
let endTime = Date.now();
if(!noMetric) {
let metricTargets = findHookTarget(targetRequest, null, 'metric')
debug.debug('NOTIFY: for %s result: %O', targetRequest.route, metricTargets);

if (metricTargets instanceof Array) {
let getMetricRequest = function(){
if (metricTargets instanceof Error) {
return metricTargets
}
if (!metricTargets.length) {
return false
}
let router = metricTargets.pop()
debug.log('Metric route %s result %O', targetRequest.route, router);
let headers = getHeaders(router, 'metric')
let metricBody = "";
if(!router.meta) {
metricBody = JSON.stringify({
request: targetRequest.requestDetails._buffer,
response: body,
startTime: startTime,
endTime: endTime,
headers: requestOptions.headers
})
}
// Sign request for hook
headers['x-hook-signature'] = 'sha256='
+ signature('sha256', metricBody, router.secureKey);
return {
uri: router.url + targetRequest.path,
method: 'NOTIFY',
headers: headers,
body: targetRequest.requestDetails._buffer
}
}
let callbackMetricRequest = function(err, response, body){
// No action on broadcast hook.
if (err) {
debug.log('metric failed %O', err);
}
debug.log('metric sent');
// If more in queue left - send more
if (metricTargets.length) {
_request(getMetricRequest, callbackMetricRequest, targetRequest, true)
}
}
_request(getMetricRequest, callbackMetricRequest, targetRequest, true)
}
}

if (error) {
// TODO add limit to re send
debug.debug('_request Error received: %O', error);
debug.debug('_request Restart request: %O', requestOptions);
return _request(getRequest, callback);
return _request(getRequest, callback, targetRequest);
}

debug.debug('%s body: %s', requestOptions.uri, body);
Expand Down Expand Up @@ -880,7 +960,7 @@ function proxyRequest(route, path, method, jsonData, requestDetails, callback) {

})
}
_request(getEndpointRequest, callbackEndpointRequest)
_request(getEndpointRequest, callbackEndpointRequest, targetRequest)

})
}
Expand Down
2 changes: 1 addition & 1 deletion schema/service.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"type": {
"type": "string",
"enum": ["adapter", "notify", "broadcast"],
"enum": ["adapter", "notify", "broadcast", "metric"],
"required": true
},
"group": {
Expand Down

0 comments on commit ac881e0

Please sign in to comment.