From 8a381a779bd49f9724c1d541ca800f1291c77b57 Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 8 May 2019 13:48:17 +0200 Subject: [PATCH 1/8] by default do not log on info --- lib/worker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/worker.js b/lib/worker.js index 747cbf8..5d03d84 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -37,7 +37,7 @@ function Worker(options) { this.workerName = options.workerName; this.logger = options.logger || { debug() {}, - info: console.log, + info() {}, warn: console.warn, error: console.error }; From 1557b35a56bfaaa59a915624c8f6c4fc525849d7 Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 15 May 2019 10:46:12 +0200 Subject: [PATCH 2/8] new design : * 1 worker - n poolers * 1 worker - n tasks * Use promises in internal logic * pooler is asking worker for renewal permission * All task logic is in task --- README.md | 107 ++++++++++++++- lib/pooler.js | 137 +++++++++--------- lib/task.js | 87 +++++++++++- lib/worker.js | 275 +++++++++++++++++++++++-------------- test/scenarios/issue-16.js | 149 ++++++++++++++++++++ test/scenarios/test.js | 11 +- 6 files changed, 575 insertions(+), 191 deletions(-) create mode 100644 test/scenarios/issue-16.js diff --git a/README.md b/README.md index 686f2d6..d34f0e0 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ [![codecov](https://codecov.io/gh/piercus/step-function-worker/branch/master/graph/badge.svg)](https://codecov.io/gh/piercus/step-function-worker) # step-function-worker + Create a nodejs aws step-function worker/pooler easily :-) ## install @@ -16,32 +17,48 @@ npm install step-function-worker #### Basic example ```javascript -var fn = function(input, cb, heartbeat){ +const fn = function(input, cb, heartbeat){ // do something doSomething(input) - // call heartbeat sometime to avoid timeout + // call heartbeat to avoid timeout heartbeat() // call callback in the end cb(null, {"foo" : "bar"}); // output must be compatible with JSON.stringify }; -var worker = new StepFunctionWorker({ +const worker = new StepFunctionWorker({ activityArn : '', workerName : 'workerName', fn : fn, - concurrency : 2 // default is 1 + taskConcurrency : 22, // default is null = Infinity + poolConcurrency : 2 // default is 1 }); ``` + +### Concurrency management + +Since version **3.0**, `concurrency` has been replaced by `poolConcurrency` and `taskConcurrency`. + +* `taskConcurrency` (`null` means Infinite) + +It represent the maximum number of parallel tasks done by the worker (default: `null`). + +* `poolConcurrency` is the maximum number of parallel getActivity, http request (see [`sdk.getActivity`](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/StepFunctions.html#getActivityTask-property)) (default: `1`) + +Increase this to have a more responsive worker. + +Anyway, you should always have `poolConcurrency` < `taskConcurrency`. + #### Set the Region By default, this package is built on top of `aws-sdk` so you should set your AWS Region by changing `AWS_REGION` environment variable. If you want to set it in JS code directly you can do it using `awsConfig` (see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like -``` -var worker = new StepFunctionWorker({ +```javascript +const worker = new StepFunctionWorker({ activityArn : '', workerName : 'workerName', fn : fn, @@ -61,36 +78,114 @@ worker.close(function(){ }) ``` +#### Get info on current worker + +```javascript +// A worker as multiple poolers and multiple running tasks +// You can have infos about it by doing +const {poolers, tasks} = worker.report(); + +// poolers is an array of { +// startTime: , +// workerName: , +// status: +// } +// +// tasks is an array of { +// taskToken: , +// input: , +// startTime: +// } +// +``` + +#### Custom logging with winston + +You can customize logging by using a [winston](https://www.npmjs.com/package/winston) logger (or winston-like logger) as input + +```javascript +const winston = require('winston'); + +const logger = winston.createLogger({ + level: 'debug', + format: winston.format.json(), + defaultMeta: { service: 'user-service' }, + transports: [ + // + // - Write to all logs with level `info` and below to `combined.log` + // - Write all logs error (and below) to `error.log`. + // + new winston.transports.File({ filename: 'error.log', level: 'error' }), + new winston.transports.File({ filename: 'combined.log' }) + ] +}); + +const worker = new StepFunctionWorker({ + activityArn : '', + workerName : 'workerName', + fn : fn, + logger +}); +``` + +Alternatively, you can just use a winston-like logger + +```javascript +const logger = console; + +const worker = new StepFunctionWorker({ + activityArn : '', + workerName : 'workerName', + fn : fn, + logger +}); +``` + #### Events ```javascript +// when a task starts worker.on('task', function(task){ // task.taskToken // task.input console.log("task ", task.input) }); +// when a task fails worker.on('failure', function(failure){ // out.error // out.taskToken console.log("Failure :",failure.error) }); +// when a heartbeat signal is sent worker.on('heartbeat', function(beat){ // out.taskToken console.log("Heartbeat"); }); +// when a task succeed worker.on('success', function(out){ // out.output // out.taskToken console.log("Success :",out.output) }); +// when an error happens worker.on('error', function(err){ console.log("error ", err) }); + +// when the worker has no more task to process +worker.on('empty', function(){ + console.log("error ", err) +}); + +// when the worker reaches taskConcurrency tasks +worker.on('full', function(err){ + console.log("error ", err) +}); ``` ### Documentation diff --git a/lib/pooler.js b/lib/pooler.js index 86b38aa..ced4bb7 100644 --- a/lib/pooler.js +++ b/lib/pooler.js @@ -1,6 +1,5 @@ const util = require('util'); -const {EventEmitter} = require('events'); -const Task = require('./task.js'); +const crypto = require('crypto'); /** * @class Pooler @@ -11,35 +10,26 @@ const Task = require('./task.js'); * */ function Pooler(options) { - EventEmitter.call(this); - - this._running = true; - this._task = false; + this.id = crypto.randomBytes(3).toString('hex'); this.logger = options.logger; + this.startTime = new Date(); this.activityArn = options.activityArn; this.worker = options.worker; this.index = options.index; this.workerName = options.workerName && (options.workerName + '-' + this.index); - this._request = null; - this.pool(); + this.logger.debug(`new pooler ${this.id}`) + this.getActivityTask(); } -Pooler.prototype.stop = function (cb) { - this._running = false; - if (this._task) { - this._task.removeAllListeners(); - } - - if (this._request) { - this.on('stopPooling', () => { - this.removeAllListeners(); - cb(); - }); - // This would be better approach but it does not seem to work - // this._request.abort(); - } else { - cb(); +Pooler.prototype.stop = function () { + this.logger.debug(`Pooler (${this.id}): Stop`) + + if(!this._stoppingPromise){ + this._stoppingPromise = (this._requestPromise || Promise.resolve()).then(() => { + this._stopped = true; + }) } + return this._stoppingPromise; }; /** @@ -54,68 +44,73 @@ Pooler.prototype.stop = function (cb) { */ Pooler.prototype.report = function () { return { - workerName: this.workerName, - status: (this._task ? 'Task under going' : (this._running ? 'Waiting for Tasks' : 'Paused')), - task: this._task && this._task.report() + id: this.id, + startTime: this.startTime, + status: (this._stopped ? 'Stopped' : 'Running') }; }; Pooler.prototype.restart = function () { - this._running = true; - this.pool(); -}; - -Pooler.prototype.pool = function () { - if (this._running) { - if (this._task) { - throw (new Error('pool should not be called when task on going')); - } - - if (this._request) { - throw (new Error('pool should not be called when request on going')); - } - + return this.stop().then(() => { + this._stopped = false; this.getActivityTask(); - } else { - this.emit('stopPooling'); - } + return Promise.resolve(); + }) }; Pooler.prototype.getActivityTask = function () { - this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn); - this._request = this.worker.stepfunction.getActivityTask({ - activityArn: this.activityArn, - workerName: this.workerName - }, (err, data) => { - this._request = null; - if (err) { + //this.logger.info('getActivityTask'); + + //this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn); + if(this._stopped){ + return Promise.reject(`Pooler (${this.id}) is stopped`) + } + if(!this._requestPromise){ + this.logger.debug(`Pooler (${this.id}): getActivityTask`) + + this._requestPromise = this.worker.stepfunction.getActivityTask({ + activityArn: this.activityArn, + workerName: this.workerName + }).promise() + .then(data => { + if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) { + this.logger.debug(`Pooler (${this.id}): Activity task received (${data.taskToken.slice(0,10)})`) + const params = Object.assign({}, data, { + input: JSON.parse(data.input), + workerName: this.workerName, + poolerId: this.id + }); + return this.worker.addTask(params) + } else { + this.logger.debug(`Pooler (${this.id}): No activity task received`) + return Promise.resolve() + } + }) + .then(() => { + this._requestPromise = null; + const renewal = this.worker.renewPooler(this); + if(!renewal){ + this.stop(); + this.worker.removePooler(this) + return Promise.resolve() + } else { + return this.getActivityTask() + } + }) + .catch(err => { // Console.log(err); + this.logger.error(`Pooler (${this.id}):`, err) if (err.code === 'RequestAbortedError') { // In case of abort, close silently } else { - this.emit('error', err); + this.worker.emit('error', err); } - return; - } - - if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) { - const params = Object.assign({}, data, {input: JSON.parse(data.input), workerName: this.workerName}); - - this.worker.emit('task', params); - - this._task = new Task(Object.assign({}, params, {worker: this.worker, logger: this.logger})); - - this._task.once('finish', () => { - this._task = null; - this.pool(); - }); - } else { - this.pool(); - } - }); + //return Promise.reject(err); + }); + } else { + return this._requestPromise + } }; -util.inherits(Pooler, EventEmitter); - module.exports = Pooler; diff --git a/lib/task.js b/lib/task.js index 9785771..6232f5b 100644 --- a/lib/task.js +++ b/lib/task.js @@ -1,5 +1,6 @@ const {EventEmitter} = require('events'); const util = require('util'); +const replaceError = require('./replace-error.js'); /** * @class StepFunctionWorker @@ -16,11 +17,13 @@ function Task(options) { this.logger = options.logger; this.worker = options.worker; + this.stepfunction = this.worker.stepfunction; this.input = options.input; this.taskToken = options.taskToken; this.workerName = options.workerName; this.startTime = new Date(); - this.worker.execute(this.input, this.taskCallback.bind(this), this.heartbeat.bind(this)); + this._finished = false; + this._execute(this.input, this.taskCallback.bind(this), this.heartbeat.bind(this)); } Task.prototype.taskCallback = function (err, res) { @@ -53,33 +56,105 @@ Task.prototype.report = function () { }; Task.prototype.succeed = function (res) { - this.worker.succeed({ + this.logger.debug(`Succeed (${this.input.index})`) + this._succeed({ input: this.input, output: res, taskToken: this.taskToken, workerName: this.workerName }); - this.emit('finish'); + this._finished = true; }; Task.prototype.fail = function (err) { - this.worker.fail({ + this._fail({ error: err, input: this.input, taskToken: this.taskToken, workerName: this.workerName }); - this.emit('finish'); + this._finished = true; }; Task.prototype.heartbeat = function () { - this.worker.heartbeat({ + this.logger.debug(`Heartbeat (${this.input.index})`) + + this._heartbeat({ input: this.input, taskToken: this.taskToken, workerName: this.workerName }); }; + +Task.prototype._execute = function (input, cb, heartbeat) { + setImmediate(() => { + try { + this.worker.fn(input, cb, heartbeat); + } catch (error) { + cb(error); + } + }); +}; + +Task.prototype._succeed = function (res) { + const params = Object.assign({}, res, {output: JSON.stringify(res.output)}); + delete params.workerName; + delete params.input; + this.stepfunction.sendTaskSuccess(params, err => { + if (err) { + this.logger.error(`Cannot sendTaskSuccess`, err) + this.worker.emit('error', {err, input: res.input}); + } else { + this.worker.emit('success', res); + } + }); +}; + +Task.prototype._fail = function (res) { + let error = JSON.stringify(res.error, replaceError); + + if (error.length > 256) { + // Otherwise aws sdk will tell + // failed to satisfy constraint: Member must have length less than or equal to 256 + error = error.slice(0, 253) + '...'; + } + + const params = Object.assign({}, res, {error}); + delete params.workerName; + delete params.input; + //this.logger.debug('sendTaskFailure', res.error); + this.stepfunction.sendTaskFailure(params, err => { + if (err) { + this.worker.emit('error', {err, input: res.input}); + } else { + this.worker.emit('failure', res); + } + }); +}; + +Task.prototype._heartbeat = function (res) { + const params = Object.assign({}, res); + delete params.workerName; + delete params.input; + //this.logger.debug('sendTaskHeartbeat'); + + this.stepfunction.sendTaskHeartbeat(params, err => { + if (err) { + if(err.code === 'TaskTimedOut' && this._finished){ + this.logger.warn( + `Heartbeat response received after task is finished (succeed or failed) + To remove this warning make sure to not send heartbeat() just before calling cb()` + ); + } else { + this.worker.emit('error', {err, input: res.input}); + } + } else { + this.worker.emit('heartbeat', res); + } + }); +}; + util.inherits(Task, EventEmitter); module.exports = Task; diff --git a/lib/worker.js b/lib/worker.js index 5d03d84..17e93ff 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -4,7 +4,8 @@ const AWS = require('aws-sdk'); const parser = require('aws-arn-parser'); const Pooler = require('./pooler.js'); -const replaceError = require('./replace-error.js'); +const Task = require('./task.js'); + /** * @typedef {Object} AWSConfig see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html */ @@ -30,8 +31,13 @@ function Worker(options) { if (!options.activityArn) { throw (new Error('activityArn is mandatory inside Worker')); } + + if(typeof(options.concurrency) === 'number'){ + throw(new Error('step-function-worker is not supporting `concurrency` parameter since version 3.0, see README.md')) + } - this.concurrency = typeof (options.concurrency) === 'number' ? options.concurrency : 1; + this.poolConcurrency = typeof (options.poolConcurrency) === 'number' ? options.poolConcurrency : 1; + this.taskConcurrency = typeof (options.taskConcurrency) === 'number' ? options.taskConcurrency : null; this.activityArn = options.activityArn; this.workerName = options.workerName; @@ -43,6 +49,7 @@ function Worker(options) { }; this.fn = options.fn; this._poolers = []; + this._tasks = []; if (typeof (this.fn) !== 'function') { throw (new TypeError('worker does not define any function')); @@ -56,10 +63,15 @@ function Worker(options) { if (this.autoStart) { setImmediate(() => { - this.start(() => { - // Do nothing - this.emit('ready'); - }); + this.start() + .then(() => { + // Do nothing + this.emit('ready'); + }) + .catch(err => { + this.logger.error(`Worker failed to start`, err) + this.emit('error', err); + }) }); } } @@ -68,11 +80,10 @@ function Worker(options) { * Start the worker pooling for new tasks * @param {function} cb callback(err) */ -Worker.prototype.start = function (cb) { - this.updatePool(err => { - this.logger.info('Worker started'); - cb(err); - }); +Worker.prototype.start = function () { + this.increasePool() + this.logger.info('Worker started'); + return Promise.resolve() }; /** @@ -80,25 +91,92 @@ Worker.prototype.start = function (cb) { * @return {Array.} list of poolers */ Worker.prototype.report = function () { - return this._poolers.map(pooler => { - return pooler.report(); - }); + return { + poolers: this._poolers.map(pooler => { + return pooler.report(); + }), + tasks: this._tasks.map(task => { + return task.report(); + }) + }; +}; + +Worker.prototype.renewPooler = function(pooler){ + const maxNumberOfPools = this.getMaxNumberOfPools(); + + if(this._poolers.length > maxNumberOfPools){ + const index = this._poolers.indexOf(pooler); + if(index === -1){ + throw(new Error('cannot removed non-listed pooler')) + } + return false; + } else { + this.increasePool() + return true; + } +} + +Worker.prototype.getMaxNumberOfPools = function (cb) { + let maxNumberOfPools = this.poolConcurrency; + if(typeof(this.taskConcurrency) === 'number'){ + maxNumberOfPools = Math.min(this.taskConcurrency - this._tasks.length, this.poolConcurrency); + } + if(maxNumberOfPools < 0){ + throw(new Error(`maxNumberOfPools (${maxNumberOfPools}) should be positive`)) + } + return maxNumberOfPools; }; -Worker.prototype.updatePool = function (cb) { - if (this._poolers.length < this.concurrency) { +Worker.prototype.increasePool = function () { + const maxNumberOfPools = this.getMaxNumberOfPools(); + this.logger.debug('increasePool started', maxNumberOfPools, this._poolers.length); + + if (this._poolers.length < maxNumberOfPools) { this.addPooler(this._poolers.length); - this.updatePool(cb); - } else if (this._poolers.length > this.concurrency) { - this.removePooler(() => { - this.updatePool(cb); - }); + return this.increasePool() + } else if (this._poolers.length > maxNumberOfPools) { + return false } else { - cb(); + return true + } +}; + +Worker.prototype.addTask = function (params) { + //this.logger.count('addTask'); + const task = new Task(Object.assign({}, params, {worker: this, logger: this.logger})); + this._tasks.push(task); + this.emit('task', params); + task.on('finish', () => { + //this.logger.count('finishTask'); + const index = this._tasks.indexOf(task); + if (index === -1) { + throw (new Error('tasks is not registered in _tasks')); + } + this._tasks.splice(index, 1); + this.updateTasks(); + this.increasePool(); + }); + this.updateTasks(); +}; + + +Worker.prototype.updateTasks = function () { + if(typeof(this.taskConcurrency) === 'number'){ + if (this._tasks.length === this.taskConcurrency) { + this.emit('full'); + } else if(this._tasks.length > this.taskConcurrency){ + throw(new Error(`Should not reach ${this._tasks.length} tasks`)) + } + } + + if (this._tasks.length === 0) { + this.logger.info('empty') + this.emit('empty'); } }; Worker.prototype.addPooler = function (index) { + this.logger.debug(`addPooler`) const pooler = new Pooler({ activityArn: this.activityArn, workerName: this.workerName, @@ -107,18 +185,50 @@ Worker.prototype.addPooler = function (index) { index }); - pooler.on('error', err => { - this.emit('error', err); - }); - this._poolers.push(pooler); }; -Worker.prototype.removePooler = function (cb) { - const removedPooler = this._poolers.pop(); - removedPooler.stop(cb); +Worker.prototype.removePooler = function (pooler) { + this.logger.debug(`removePooler`) + + const index = this._poolers.indexOf(pooler); + if(index === -1){ + throw(new Error(`pooler ${pooler} is not in the pooler list`)) + } + this._poolers.splice(index, 1); + + if(this._poolers.length === 0){ + this.emit('empty-poolers') + } }; +// Worker.prototype.removePooler = function () { +// if(!this._poolerRemovalPromise){ +// this._poolerRemovalPromise = Promise.resolve() +// .then(() => { +// this.logger.debug('removePooler started') +// const removedPooler = this._poolers[this._poolers.length -1]; +// const id = Math.random(); +// const _this = this; +// return removedPooler.stop() +// }).then(() => { +// const index = _this._poolers.indexOf(removedPooler); +// if(index === -1){ +// throw(new Error('cross poolers removal is not expected')) +// } +// _this._poolers.splice(index, 1); +// return _this._poolers +// }).then(r => { +// this.logger.debug('removePooler ended') +// +// this._poolerRemovalPromise = null +// return r; +// }) +// } +// +// return this._poolerRemovalPromise; +// }; + /** * Close the worker, this function might take 60 seconds to finish to do step function design * remove all the events attached to the worker @@ -126,8 +236,14 @@ Worker.prototype.removePooler = function (cb) { */ Worker.prototype.close = function (cb) { - this.stop(cb); this.removeAllListeners(); + const promise = this.stop(); + + if(cb){ + promise.then(() => cb()).catch(cb) + } else { + return promise; + } }; /** @@ -137,86 +253,39 @@ Worker.prototype.close = function (cb) { * @param {function} callback */ -Worker.prototype.stop = function (cb) { +Worker.prototype.stop = function () { this.logger.info('Stopping the worker ... this might take 60 seconds'); - this.concurrency = 0; - this.updatePool(err => { - this.logger.info('Worker stopped'); - cb(err); - }); + this.poolConcurrency = 0; + if(!this._stoppingPromise){ + this._stoppingPromise = new Promise((resolve, reject) => { + const onEmpty = () => { + this.logger.info('Worker stopped'); + if (this._tasks.length > 0) { + const err = new Error('Some tasks are still ongoing, please make sure all the tasks are finished before stopping the worker'); + return reject(err); + } else { + return resolve(); + } + } + if(this._poolers.length === 0){ + onEmpty(); + } + this.once('empty-poolers', () => { + onEmpty(); + }) + }) + } + return this._stoppingPromise }; Worker.prototype.restart = function (cb) { - const oldConcurrency = this.concurrency; - this.stop(err => { - if (err) { - return cb(err); - } - - this.concurrency = oldConcurrency; - this.start(cb); + const oldPoolConcurrency = this.poolConcurrency; + return this.stop().then(() => { + this.poolConcurrency = oldPoolConcurrency; + return this.start(cb); }); }; -Worker.prototype.execute = function (input, cb, heartbeat) { - setImmediate(() => { - try { - this.fn(input, cb, heartbeat); - } catch (error) { - cb(error); - } - }); -}; - -Worker.prototype.succeed = function (res) { - const params = Object.assign({}, res, {output: JSON.stringify(res.output)}); - delete params.workerName; - delete params.input; - this.stepfunction.sendTaskSuccess(params, err => { - if (err) { - this.emit('error', {err, input: res.input}); - } else { - this.emit('success', res); - } - }); -}; - -Worker.prototype.fail = function (res) { - let error = JSON.stringify(res.error, replaceError); - - if (error.length > 256) { - // Otherwise aws sdk will tell - // failed to satisfy constraint: Member must have length less than or equal to 256 - error = error.slice(0, 253) + '...'; - } - - const params = Object.assign({}, res, {error}); - delete params.workerName; - delete params.input; - this.logger.debug('sendTaskFailure', res.error); - this.stepfunction.sendTaskFailure(params, err => { - if (err) { - this.emit('error', {err, input: res.input}); - } else { - this.emit('failure', res); - } - }); -}; - -Worker.prototype.heartbeat = function (res) { - const params = Object.assign({}, res); - delete params.workerName; - delete params.input; - this.logger.debug('sendTaskHeartbeat'); - - this.stepfunction.sendTaskHeartbeat(params, err => { - if (err) { - this.emit('error', {err, input: res.input}); - } else { - this.emit('heartbeat', res); - } - }); -}; util.inherits(Worker, EventEmitter); diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js new file mode 100644 index 0000000..a233c5a --- /dev/null +++ b/test/scenarios/issue-16.js @@ -0,0 +1,149 @@ +const test = require('ava'); +const AWS = require('aws-sdk'); +const StepFunctionWorker = require('../..'); +const createActivity = require('../utils/create-activity'); +const cleanUp = require('../utils/clean-up'); +const winston = require('winston'); +const stepFunction = new AWS.StepFunctions(); +const workerName = 'test worker name'; +const stateMachineName = 'test-state-machine-' + Math.floor(Math.random() * 1000); +const activityName = 'test-step-function-worker-' + Math.floor(Math.random() * 1000); + +process.on('uncaughtException', err => { + console.log('uncaughtException', err); +}); +/* +{ + definition: '{"Comment":"An Example State machine using Activity.","StartAt":"FirstState","States":{"FirstState":{"Type":"Task","Resource":"arn:aws:states:eu-central-1:170670752151:activity:test-step-function-worker","TimeoutSeconds":300,"HeartbeatSeconds":60,"Next":"End"}}}', + name: 'test-state-machine', + roleArn: 'arn:aws:iam::170670752151:role/service-role/StatesExecutionRole-eu-central-1' +} +*/ + +const context = {}; + +const before = createActivity.bind(null, { + context, + activityName, + stateMachineName, + workerName +}); +const after = cleanUp.bind(null, context); + +const sentInput = function(i){ + return { + foo: 'bar', + index: i + }; +} +const sentOutput = {foo2: 'bar2'}; + +const taskDurationBase = 500; +const fn = function (event, callback, heartbeat) { + heartbeat(); + + const totalDuration = Math.ceil(Math.random()*taskDurationBase); + setTimeout(() => { + // Assert.equal(event, sentInput); + heartbeat(); + }, totalDuration); + setTimeout(() => { + // Assert.equal(event, sentInput); + heartbeat(); + }, 2*totalDuration); + setTimeout(() => { + // Assert.equal(event, sentInput); + heartbeat(); + }, 3*totalDuration); + setTimeout(() => { + // Assert.equal(event, sentInput); + heartbeat(); + }, 4*totalDuration); + setTimeout(() => { + // Assert.equal(event, sentInput); + heartbeat(); + }, 5*totalDuration); + setTimeout(() => { + // Assert.equal(event, sentInput); + callback(null, sentOutput); + }, 6*totalDuration); +}; + +test.before(before); + +test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat', t => { + const {activityArn, stateMachineArn} = context; + const startDate = new Date(); + const totalTasks = 10; + const poolConcurrency = 3; + const taskConcurrency = 5; + const worker = new StepFunctionWorker({ + activityArn, + workerName: workerName + '-fn', + fn, + logger: new winston.Logger({ + level: 'debug', + transports: [ + new (winston.transports.Console)({ + timestamp: function() { + return (new Date()).toISOString().slice(11); + }, + formatter: function(options) { + // - Return string will be passed to logger. + // - Optionally, use options.colorize(options.level, ) to + // colorize output based on the log level. + return options.timestamp() + ' ' + + winston.config.colorize(options.level, options.level.toUpperCase()) + ' ' + + (options.message ? options.message : '') + + (options.meta && Object.keys(options.meta).length ? '\n\t'+ JSON.stringify(options.meta) : '' ); + } + }) + ] + }), + poolConcurrency, + taskConcurrency + }); + + const params = function(i){ + return { + stateMachineArn, + input: JSON.stringify(sentInput(i)) + }; + }; + let count = 0; + let countFull = 0; + worker.on('task', () => { + count++; + }); + worker.on('full', () => { + countFull++; + const report = worker.report(); + t.is(report.tasks.length, taskConcurrency); + }); + const promises = []; + for (let i = 0; i < totalTasks; i++) { + promises.push(stepFunction.startExecution(params(i)).promise()); + } + + return new Promise((resolve, reject) => { + worker.once('empty', () => { + t.is(count, totalTasks); + console.log({ + countFull, + totalTasks, + taskConcurrency + }) + //t.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) + const endDate = new Date(); + worker.logger.info(`Spent ${(endDate - startDate) / 1000} seconds`); + worker.close(() => { + resolve(); + }); + }); + worker.on('error', reject); + + return Promise.all(promises); + }); +}); + +//test.after(after); diff --git a/test/scenarios/test.js b/test/scenarios/test.js index db8d942..7f7db50 100644 --- a/test/scenarios/test.js +++ b/test/scenarios/test.js @@ -93,14 +93,14 @@ test.serial('Step function Activity Worker with 2 consecutive tasks', t => { }); }); -test.serial('Step function with 3 concurrent worker', t => { +test.serial('Step function with 3 poolConcurrency worker', t => { const {activityArn, stateMachineArn} = context; const worker = new StepFunctionWorker({ activityArn, - workerName: workerName + '-concurrent', + workerName: workerName + '-poolConcurrency', fn: fn2, - concurrency: 3 + poolConcurrency: 3 }); const params1 = { stateMachineArn, @@ -144,7 +144,8 @@ test.serial('Step function with 3 concurrent worker', t => { if (countSuccess === 1) { const report = worker.report(); - t.is(report.length, 3); + t.is(report.poolers.length, 3); + t.is(report.tasks.length, 0); } if (countSuccess === 3) { @@ -175,7 +176,7 @@ test.serial('Restart the worker', t => { activityArn, workerName: workerName + '-restart', fn: fn2, - concurrency: 1 + poolConcurrency: 1 }); const params1 = { stateMachineArn, From 291638f5f5e6c0af234ee544981ebf54413c6af2 Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 15 May 2019 12:22:32 +0200 Subject: [PATCH 3/8] xo --- lib/pooler.js | 99 ++++++++++++++--------------- lib/task.js | 15 +++-- lib/worker.js | 125 ++++++++++++++++++++----------------- test/scenarios/issue-16.js | 69 ++++++++++---------- 4 files changed, 160 insertions(+), 148 deletions(-) diff --git a/lib/pooler.js b/lib/pooler.js index ced4bb7..280a14d 100644 --- a/lib/pooler.js +++ b/lib/pooler.js @@ -1,4 +1,3 @@ -const util = require('util'); const crypto = require('crypto'); /** @@ -17,18 +16,19 @@ function Pooler(options) { this.worker = options.worker; this.index = options.index; this.workerName = options.workerName && (options.workerName + '-' + this.index); - this.logger.debug(`new pooler ${this.id}`) + this.logger.debug(`new pooler ${this.id}`); this.getActivityTask(); } Pooler.prototype.stop = function () { - this.logger.debug(`Pooler (${this.id}): Stop`) - - if(!this._stoppingPromise){ + this.logger.debug(`Pooler (${this.id}): Stop`); + + if (!this._stoppingPromise) { this._stoppingPromise = (this._requestPromise || Promise.resolve()).then(() => { this._stopped = true; - }) + }); } + return this._stoppingPromise; }; @@ -55,62 +55,63 @@ Pooler.prototype.restart = function () { this._stopped = false; this.getActivityTask(); return Promise.resolve(); - }) + }); }; Pooler.prototype.getActivityTask = function () { - //this.logger.info('getActivityTask'); + // This.logger.info('getActivityTask'); - //this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn); - if(this._stopped){ - return Promise.reject(`Pooler (${this.id}) is stopped`) + // this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn); + if (this._stopped) { + return Promise.reject(new Error(`Pooler (${this.id}) is stopped`)); } - if(!this._requestPromise){ - this.logger.debug(`Pooler (${this.id}): getActivityTask`) - + + if (!this._requestPromise) { + this.logger.debug(`Pooler (${this.id}): getActivityTask`); + this._requestPromise = this.worker.stepfunction.getActivityTask({ activityArn: this.activityArn, workerName: this.workerName }).promise() - .then(data => { - if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) { - this.logger.debug(`Pooler (${this.id}): Activity task received (${data.taskToken.slice(0,10)})`) - const params = Object.assign({}, data, { - input: JSON.parse(data.input), - workerName: this.workerName, - poolerId: this.id - }); - return this.worker.addTask(params) - } else { - this.logger.debug(`Pooler (${this.id}): No activity task received`) - return Promise.resolve() - } - }) - .then(() => { - this._requestPromise = null; - const renewal = this.worker.renewPooler(this); - if(!renewal){ - this.stop(); - this.worker.removePooler(this) - return Promise.resolve() - } else { - return this.getActivityTask() - } - }) - .catch(err => { + .then(data => { + if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) { + this.logger.debug(`Pooler (${this.id}): Activity task received (${data.taskToken.slice(0, 10)})`); + const params = Object.assign({}, data, { + input: JSON.parse(data.input), + workerName: this.workerName, + poolerId: this.id + }); + return this.worker.addTask(params); + } + + this.logger.debug(`Pooler (${this.id}): No activity task received`); + return Promise.resolve(); + }) + .then(() => { + this._requestPromise = null; + const renewal = this.worker.renewPooler(this); + if (!renewal) { + this.stop(); + this.worker.removePooler(this); + return Promise.resolve(); + } + + return this.getActivityTask(); + }) + .catch(error => { // Console.log(err); - this.logger.error(`Pooler (${this.id}):`, err) - if (err.code === 'RequestAbortedError') { + this.logger.error(`Pooler (${this.id}):`, error); + if (error.code === 'RequestAbortedError') { // In case of abort, close silently - } else { - this.worker.emit('error', err); - } + } else { + this.worker.emit('error', error); + } - //return Promise.reject(err); - }); - } else { - return this._requestPromise + // Return Promise.reject(err); + }); } + + return this._requestPromise; }; module.exports = Pooler; diff --git a/lib/task.js b/lib/task.js index 6232f5b..ed0b1b1 100644 --- a/lib/task.js +++ b/lib/task.js @@ -56,7 +56,7 @@ Task.prototype.report = function () { }; Task.prototype.succeed = function (res) { - this.logger.debug(`Succeed (${this.input.index})`) + this.logger.debug(`Succeed (${this.input.index})`); this._succeed({ input: this.input, output: res, @@ -77,8 +77,8 @@ Task.prototype.fail = function (err) { }; Task.prototype.heartbeat = function () { - this.logger.debug(`Heartbeat (${this.input.index})`) - + this.logger.debug(`Heartbeat (${this.input.index})`); + this._heartbeat({ input: this.input, taskToken: this.taskToken, @@ -86,7 +86,6 @@ Task.prototype.heartbeat = function () { }); }; - Task.prototype._execute = function (input, cb, heartbeat) { setImmediate(() => { try { @@ -103,7 +102,7 @@ Task.prototype._succeed = function (res) { delete params.input; this.stepfunction.sendTaskSuccess(params, err => { if (err) { - this.logger.error(`Cannot sendTaskSuccess`, err) + this.logger.error('Cannot sendTaskSuccess', err); this.worker.emit('error', {err, input: res.input}); } else { this.worker.emit('success', res); @@ -123,7 +122,7 @@ Task.prototype._fail = function (res) { const params = Object.assign({}, res, {error}); delete params.workerName; delete params.input; - //this.logger.debug('sendTaskFailure', res.error); + // This.logger.debug('sendTaskFailure', res.error); this.stepfunction.sendTaskFailure(params, err => { if (err) { this.worker.emit('error', {err, input: res.input}); @@ -137,11 +136,11 @@ Task.prototype._heartbeat = function (res) { const params = Object.assign({}, res); delete params.workerName; delete params.input; - //this.logger.debug('sendTaskHeartbeat'); + // This.logger.debug('sendTaskHeartbeat'); this.stepfunction.sendTaskHeartbeat(params, err => { if (err) { - if(err.code === 'TaskTimedOut' && this._finished){ + if (err.code === 'TaskTimedOut' && this._finished) { this.logger.warn( `Heartbeat response received after task is finished (succeed or failed) To remove this warning make sure to not send heartbeat() just before calling cb()` diff --git a/lib/worker.js b/lib/worker.js index 17e93ff..480c7ad 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -31,9 +31,9 @@ function Worker(options) { if (!options.activityArn) { throw (new Error('activityArn is mandatory inside Worker')); } - - if(typeof(options.concurrency) === 'number'){ - throw(new Error('step-function-worker is not supporting `concurrency` parameter since version 3.0, see README.md')) + + if (typeof (options.concurrency) === 'number') { + throw (new TypeError('step-function-worker is not supporting `concurrency` parameter since version 3.0, see README.md')); } this.poolConcurrency = typeof (options.poolConcurrency) === 'number' ? options.poolConcurrency : 1; @@ -68,10 +68,10 @@ function Worker(options) { // Do nothing this.emit('ready'); }) - .catch(err => { - this.logger.error(`Worker failed to start`, err) - this.emit('error', err); - }) + .catch(error => { + this.logger.error('Worker failed to start', error); + this.emit('error', error); + }); }); } } @@ -79,11 +79,12 @@ function Worker(options) { /** * Start the worker pooling for new tasks * @param {function} cb callback(err) +* @returns {Promise} */ Worker.prototype.start = function () { - this.increasePool() + this.increasePool(); this.logger.info('Worker started'); - return Promise.resolve() + return Promise.resolve(); }; /** @@ -101,57 +102,63 @@ Worker.prototype.report = function () { }; }; -Worker.prototype.renewPooler = function(pooler){ +Worker.prototype.renewPooler = function (pooler) { const maxNumberOfPools = this.getMaxNumberOfPools(); - - if(this._poolers.length > maxNumberOfPools){ + + if (this._poolers.length > maxNumberOfPools) { const index = this._poolers.indexOf(pooler); - if(index === -1){ - throw(new Error('cannot removed non-listed pooler')) + if (index === -1) { + throw (new Error('cannot removed non-listed pooler')); } + return false; - } else { - this.increasePool() - return true; } -} -Worker.prototype.getMaxNumberOfPools = function (cb) { + this.increasePool(); + return true; +}; + +Worker.prototype.getMaxNumberOfPools = function () { let maxNumberOfPools = this.poolConcurrency; - if(typeof(this.taskConcurrency) === 'number'){ + if (typeof (this.taskConcurrency) === 'number') { maxNumberOfPools = Math.min(this.taskConcurrency - this._tasks.length, this.poolConcurrency); } - if(maxNumberOfPools < 0){ - throw(new Error(`maxNumberOfPools (${maxNumberOfPools}) should be positive`)) + + if (maxNumberOfPools < 0) { + throw (new Error(`maxNumberOfPools (${maxNumberOfPools}) should be positive`)); } + return maxNumberOfPools; }; Worker.prototype.increasePool = function () { const maxNumberOfPools = this.getMaxNumberOfPools(); this.logger.debug('increasePool started', maxNumberOfPools, this._poolers.length); - + if (this._poolers.length < maxNumberOfPools) { this.addPooler(this._poolers.length); - return this.increasePool() - } else if (this._poolers.length > maxNumberOfPools) { - return false - } else { - return true + return this.increasePool(); + } + + if (this._poolers.length > maxNumberOfPools) { + return false; } + + return true; }; Worker.prototype.addTask = function (params) { - //this.logger.count('addTask'); + // This.logger.count('addTask'); const task = new Task(Object.assign({}, params, {worker: this, logger: this.logger})); this._tasks.push(task); this.emit('task', params); task.on('finish', () => { - //this.logger.count('finishTask'); + // This.logger.count('finishTask'); const index = this._tasks.indexOf(task); if (index === -1) { throw (new Error('tasks is not registered in _tasks')); } + this._tasks.splice(index, 1); this.updateTasks(); this.increasePool(); @@ -159,24 +166,23 @@ Worker.prototype.addTask = function (params) { this.updateTasks(); }; - Worker.prototype.updateTasks = function () { - if(typeof(this.taskConcurrency) === 'number'){ + if (typeof (this.taskConcurrency) === 'number') { if (this._tasks.length === this.taskConcurrency) { this.emit('full'); - } else if(this._tasks.length > this.taskConcurrency){ - throw(new Error(`Should not reach ${this._tasks.length} tasks`)) + } else if (this._tasks.length > this.taskConcurrency) { + throw (new Error(`Should not reach ${this._tasks.length} tasks`)); } } - + if (this._tasks.length === 0) { - this.logger.info('empty') + this.logger.info('empty'); this.emit('empty'); } }; Worker.prototype.addPooler = function (index) { - this.logger.debug(`addPooler`) + this.logger.debug('addPooler'); const pooler = new Pooler({ activityArn: this.activityArn, workerName: this.workerName, @@ -189,16 +195,17 @@ Worker.prototype.addPooler = function (index) { }; Worker.prototype.removePooler = function (pooler) { - this.logger.debug(`removePooler`) + this.logger.debug('removePooler'); const index = this._poolers.indexOf(pooler); - if(index === -1){ - throw(new Error(`pooler ${pooler} is not in the pooler list`)) + if (index === -1) { + throw (new Error(`pooler ${pooler} is not in the pooler list`)); } + this._poolers.splice(index, 1); - - if(this._poolers.length === 0){ - this.emit('empty-poolers') + + if (this._poolers.length === 0) { + this.emit('empty-poolers'); } }; @@ -220,12 +227,12 @@ Worker.prototype.removePooler = function (pooler) { // return _this._poolers // }).then(r => { // this.logger.debug('removePooler ended') -// +// // this._poolerRemovalPromise = null // return r; // }) -// } -// +// } +// // return this._poolerRemovalPromise; // }; @@ -238,9 +245,9 @@ Worker.prototype.removePooler = function (pooler) { Worker.prototype.close = function (cb) { this.removeAllListeners(); const promise = this.stop(); - - if(cb){ - promise.then(() => cb()).catch(cb) + + if (cb) { + promise.then(() => cb()).catch(cb); } else { return promise; } @@ -256,26 +263,29 @@ Worker.prototype.close = function (cb) { Worker.prototype.stop = function () { this.logger.info('Stopping the worker ... this might take 60 seconds'); this.poolConcurrency = 0; - if(!this._stoppingPromise){ + if (!this._stoppingPromise) { this._stoppingPromise = new Promise((resolve, reject) => { const onEmpty = () => { this.logger.info('Worker stopped'); if (this._tasks.length > 0) { const err = new Error('Some tasks are still ongoing, please make sure all the tasks are finished before stopping the worker'); return reject(err); - } else { - return resolve(); } - } - if(this._poolers.length === 0){ + + return resolve(); + }; + + if (this._poolers.length === 0) { onEmpty(); } + this.once('empty-poolers', () => { onEmpty(); - }) - }) + }); + }); } - return this._stoppingPromise + + return this._stoppingPromise; }; Worker.prototype.restart = function (cb) { @@ -286,7 +296,6 @@ Worker.prototype.restart = function (cb) { }); }; - util.inherits(Worker, EventEmitter); module.exports = Worker; diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js index a233c5a..5ea056c 100644 --- a/test/scenarios/issue-16.js +++ b/test/scenarios/issue-16.js @@ -1,9 +1,10 @@ const test = require('ava'); const AWS = require('aws-sdk'); +const winston = require('winston'); const StepFunctionWorker = require('../..'); const createActivity = require('../utils/create-activity'); const cleanUp = require('../utils/clean-up'); -const winston = require('winston'); + const stepFunction = new AWS.StepFunctions(); const workerName = 'test worker name'; const stateMachineName = 'test-state-machine-' + Math.floor(Math.random() * 1000); @@ -23,26 +24,27 @@ process.on('uncaughtException', err => { const context = {}; const before = createActivity.bind(null, { - context, - activityName, - stateMachineName, + context, + activityName, + stateMachineName, workerName }); const after = cleanUp.bind(null, context); -const sentInput = function(i){ +const sentInput = function (i) { return { foo: 'bar', index: i - }; -} + }; +}; + const sentOutput = {foo2: 'bar2'}; const taskDurationBase = 500; const fn = function (event, callback, heartbeat) { heartbeat(); - - const totalDuration = Math.ceil(Math.random()*taskDurationBase); + + const totalDuration = Math.ceil(Math.random() * taskDurationBase); setTimeout(() => { // Assert.equal(event, sentInput); heartbeat(); @@ -50,23 +52,23 @@ const fn = function (event, callback, heartbeat) { setTimeout(() => { // Assert.equal(event, sentInput); heartbeat(); - }, 2*totalDuration); + }, 2 * totalDuration); setTimeout(() => { // Assert.equal(event, sentInput); heartbeat(); - }, 3*totalDuration); + }, 3 * totalDuration); setTimeout(() => { // Assert.equal(event, sentInput); heartbeat(); - }, 4*totalDuration); + }, 4 * totalDuration); setTimeout(() => { // Assert.equal(event, sentInput); heartbeat(); - }, 5*totalDuration); + }, 5 * totalDuration); setTimeout(() => { // Assert.equal(event, sentInput); callback(null, sentOutput); - }, 6*totalDuration); + }, 6 * totalDuration); }; test.before(before); @@ -84,32 +86,33 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat logger: new winston.Logger({ level: 'debug', transports: [ - new (winston.transports.Console)({ - timestamp: function() { - return (new Date()).toISOString().slice(11); - }, - formatter: function(options) { - // - Return string will be passed to logger. - // - Optionally, use options.colorize(options.level, ) to - // colorize output based on the log level. - return options.timestamp() + ' ' + - winston.config.colorize(options.level, options.level.toUpperCase()) + ' ' + - (options.message ? options.message : '') + - (options.meta && Object.keys(options.meta).length ? '\n\t'+ JSON.stringify(options.meta) : '' ); - } - }) - ] + new (winston.transports.Console)({ + timestamp() { + return (new Date()).toISOString().slice(11); + }, + formatter(options) { + // - Return string will be passed to logger. + // - Optionally, use options.colorize(options.level, ) to + // colorize output based on the log level. + return options.timestamp() + ' ' + + winston.config.colorize(options.level, options.level.toUpperCase()) + ' ' + + (options.message ? options.message : '') + + (options.meta && Object.keys(options.meta).length > 0 ? '\n\t' + JSON.stringify(options.meta) : ''); + } + }) + ] }), poolConcurrency, taskConcurrency }); - const params = function(i){ + const params = function (i) { return { stateMachineArn, input: JSON.stringify(sentInput(i)) }; }; + let count = 0; let countFull = 0; worker.on('task', () => { @@ -132,8 +135,8 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat countFull, totalTasks, taskConcurrency - }) - //t.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) + }); + // T.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) const endDate = new Date(); worker.logger.info(`Spent ${(endDate - startDate) / 1000} seconds`); worker.close(() => { @@ -146,4 +149,4 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat }); }); -//test.after(after); +test.after(after); From 1ea9291d68e6146f998007a22fbd868b108a3da4 Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 15 May 2019 17:00:55 +0200 Subject: [PATCH 4/8] task should emit 'finish' --- lib/task.js | 2 ++ test/scenarios/issue-16.js | 2 ++ 2 files changed, 4 insertions(+) diff --git a/lib/task.js b/lib/task.js index ed0b1b1..d7a3c0c 100644 --- a/lib/task.js +++ b/lib/task.js @@ -64,6 +64,7 @@ Task.prototype.succeed = function (res) { workerName: this.workerName }); this._finished = true; + this.emit('finish'); }; Task.prototype.fail = function (err) { @@ -74,6 +75,7 @@ Task.prototype.fail = function (err) { workerName: this.workerName }); this._finished = true; + this.emit('finish'); }; Task.prototype.heartbeat = function () { diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js index 5ea056c..77c0c08 100644 --- a/test/scenarios/issue-16.js +++ b/test/scenarios/issue-16.js @@ -130,6 +130,7 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat return new Promise((resolve, reject) => { worker.once('empty', () => { + console.log('worker empty'); t.is(count, totalTasks); console.log({ countFull, @@ -139,6 +140,7 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat // T.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) const endDate = new Date(); worker.logger.info(`Spent ${(endDate - startDate) / 1000} seconds`); + console.log('worker close'); worker.close(() => { resolve(); }); From c72ca9c481c2200f9ed9bfc55a26be3f8f0e442c Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 15 May 2019 17:31:55 +0200 Subject: [PATCH 5/8] rm logs --- test/scenarios/issue-16.js | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js index 77c0c08..8fde57f 100644 --- a/test/scenarios/issue-16.js +++ b/test/scenarios/issue-16.js @@ -130,17 +130,15 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat return new Promise((resolve, reject) => { worker.once('empty', () => { - console.log('worker empty'); t.is(count, totalTasks); - console.log({ - countFull, - totalTasks, - taskConcurrency - }); + // console.log({ + // countFull, + // totalTasks, + // taskConcurrency + // }); // T.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) const endDate = new Date(); worker.logger.info(`Spent ${(endDate - startDate) / 1000} seconds`); - console.log('worker close'); worker.close(() => { resolve(); }); From b1e1cacf9de08e3212e2475618d51de814711163 Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 15 May 2019 17:32:11 +0200 Subject: [PATCH 6/8] xo --- test/scenarios/issue-16.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js index 8fde57f..6b33c8e 100644 --- a/test/scenarios/issue-16.js +++ b/test/scenarios/issue-16.js @@ -131,11 +131,6 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat return new Promise((resolve, reject) => { worker.once('empty', () => { t.is(count, totalTasks); - // console.log({ - // countFull, - // totalTasks, - // taskConcurrency - // }); // T.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) const endDate = new Date(); worker.logger.info(`Spent ${(endDate - startDate) / 1000} seconds`); From 7bc0c9a0e28cf74fcf80f7950ae2d9c9d421a31a Mon Sep 17 00:00:00 2001 From: Pierre Date: Fri, 17 May 2019 16:01:13 +0200 Subject: [PATCH 7/8] BREAKING CHANGES: Redesign of the whole architecture --- lib/worker.js | 25 +++++++++++++++++++++++-- test/scenarios/issue-16.js | 1 + test/scenarios/test.js | 2 ++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/lib/worker.js b/lib/worker.js index 480c7ad..0b5c7a9 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -209,6 +209,21 @@ Worker.prototype.removePooler = function (pooler) { } }; +Worker.prototype.removeTask = function (pooler) { + this.logger.debug('removePooler'); + + const index = this._poolers.indexOf(pooler); + if (index === -1) { + throw (new Error(`pooler ${pooler} is not in the pooler list`)); + } + + this._poolers.splice(index, 1); + + if (this._poolers.length === 0) { + this.emit('empty-poolers'); + } +}; + // Worker.prototype.removePooler = function () { // if(!this._poolerRemovalPromise){ // this._poolerRemovalPromise = Promise.resolve() @@ -290,10 +305,16 @@ Worker.prototype.stop = function () { Worker.prototype.restart = function (cb) { const oldPoolConcurrency = this.poolConcurrency; - return this.stop().then(() => { + + const promise = this.stop().then(() => { this.poolConcurrency = oldPoolConcurrency; - return this.start(cb); + return this.start(); }); + if (cb) { + promise.catch(cb).then(() => cb()); + } else { + return promise; + } }; util.inherits(Worker, EventEmitter); diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js index 6b33c8e..61e46b5 100644 --- a/test/scenarios/issue-16.js +++ b/test/scenarios/issue-16.js @@ -131,6 +131,7 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat return new Promise((resolve, reject) => { worker.once('empty', () => { t.is(count, totalTasks); + t.true(countFull > 0); // T.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks) const endDate = new Date(); worker.logger.info(`Spent ${(endDate - startDate) / 1000} seconds`); diff --git a/test/scenarios/test.js b/test/scenarios/test.js index 7f7db50..9e342cf 100644 --- a/test/scenarios/test.js +++ b/test/scenarios/test.js @@ -197,7 +197,9 @@ test.serial('Restart the worker', t => { if (countSuccess === 1) { const beforeRestartLength = worker._poolers.length; + console.log('restart'); worker.restart(() => { + console.log('restarted'); t.is(worker._poolers.length, beforeRestartLength); stepFunction.startExecution(params2).promise(); }); From c5491186f12dcaa4758a4af6fa5528ec55235016 Mon Sep 17 00:00:00 2001 From: Pierre Date: Mon, 20 May 2019 09:04:08 +0200 Subject: [PATCH 8/8] change base duration for travis to pass --- test/scenarios/issue-16.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/scenarios/issue-16.js b/test/scenarios/issue-16.js index 61e46b5..4f7588f 100644 --- a/test/scenarios/issue-16.js +++ b/test/scenarios/issue-16.js @@ -40,7 +40,7 @@ const sentInput = function (i) { const sentOutput = {foo2: 'bar2'}; -const taskDurationBase = 500; +const taskDurationBase = 2000; const fn = function (event, callback, heartbeat) { heartbeat();