Skip to content

Commit

Permalink
Merge pull request #19 from piercus/issue-16
Browse files Browse the repository at this point in the history
Redesign
  • Loading branch information
piercus authored May 20, 2019
2 parents 0c3f2d9 + c549118 commit 4ac9fb9
Show file tree
Hide file tree
Showing 6 changed files with 607 additions and 190 deletions.
107 changes: 101 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[![codecov](https://codecov.io/gh/piercus/step-function-worker/branch/master/graph/badge.svg)](https://codecov.io/gh/piercus/step-function-worker)

# step-function-worker

Create a nodejs aws step-function worker/pooler easily :-)

## install
Expand All @@ -16,32 +17,48 @@ npm install step-function-worker
#### Basic example

```javascript
var fn = function(input, cb, heartbeat){
const fn = function(input, cb, heartbeat){
// do something
doSomething(input)

// call heartbeat sometime to avoid timeout
// call heartbeat to avoid timeout
heartbeat()

// call callback in the end
cb(null, {"foo" : "bar"}); // output must be compatible with JSON.stringify
};

var worker = new StepFunctionWorker({
const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
concurrency : 2 // default is 1
taskConcurrency : 22, // default is null = Infinity
poolConcurrency : 2 // default is 1
});
```

### Concurrency management

Since version **3.0**, `concurrency` has been replaced by `poolConcurrency` and `taskConcurrency`.

* `taskConcurrency` (`null` means Infinite)

It represent the maximum number of parallel tasks done by the worker (default: `null`).

* `poolConcurrency` is the maximum number of parallel getActivity, http request (see [`sdk.getActivity`](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/StepFunctions.html#getActivityTask-property)) (default: `1`)

Increase this to have a more responsive worker.

Anyway, you should always have `poolConcurrency` < `taskConcurrency`.

#### Set the Region

By default, this package is built on top of `aws-sdk` so you should set your AWS Region by changing `AWS_REGION` environment variable.

If you want to set it in JS code directly you can do it using `awsConfig` (see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like

```
var worker = new StepFunctionWorker({
```javascript
const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
Expand All @@ -61,36 +78,114 @@ worker.close(function(){
})
```

#### Get info on current worker

```javascript
// A worker as multiple poolers and multiple running tasks
// You can have infos about it by doing
const {poolers, tasks} = worker.report();

// poolers is an array of {
// startTime: <Date>,
// workerName: <String>,
// status: <String>
// }
//
// tasks is an array of {
// taskToken: <String>,
// input: <Object>,
// startTime: <Date>
// }
//
```

#### Custom logging with winston

You can customize logging by using a [winston](https://www.npmjs.com/package/winston) logger (or winston-like logger) as input

```javascript
const winston = require('winston');

const logger = winston.createLogger({
level: 'debug',
format: winston.format.json(),
defaultMeta: { service: 'user-service' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});

const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
logger
});
```

Alternatively, you can just use a winston-like logger

```javascript
const logger = console;

const worker = new StepFunctionWorker({
activityArn : '<activity-ARN>',
workerName : 'workerName',
fn : fn,
logger
});
```

#### Events


```javascript
// when a task starts
worker.on('task', function(task){
// task.taskToken
// task.input
console.log("task ", task.input)
});

// when a task fails
worker.on('failure', function(failure){
// out.error
// out.taskToken
console.log("Failure :",failure.error)
});

// when a heartbeat signal is sent
worker.on('heartbeat', function(beat){
// out.taskToken
console.log("Heartbeat");
});

// when a task succeed
worker.on('success', function(out){
// out.output
// out.taskToken
console.log("Success :",out.output)
});

// when an error happens
worker.on('error', function(err){
console.log("error ", err)
});

// when the worker has no more task to process
worker.on('empty', function(){
console.log("error ", err)
});

// when the worker reaches taskConcurrency tasks
worker.on('full', function(err){
console.log("error ", err)
});
```

### Documentation
Expand Down
138 changes: 67 additions & 71 deletions lib/pooler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
const util = require('util');
const {EventEmitter} = require('events');
const Task = require('./task.js');
const crypto = require('crypto');

/**
* @class Pooler
Expand All @@ -11,35 +9,27 @@ const Task = require('./task.js');
* */

function Pooler(options) {
EventEmitter.call(this);

this._running = true;
this._task = false;
this.id = crypto.randomBytes(3).toString('hex');
this.logger = options.logger;
this.startTime = new Date();
this.activityArn = options.activityArn;
this.worker = options.worker;
this.index = options.index;
this.workerName = options.workerName && (options.workerName + '-' + this.index);
this._request = null;
this.pool();
this.logger.debug(`new pooler ${this.id}`);
this.getActivityTask();
}

Pooler.prototype.stop = function (cb) {
this._running = false;
if (this._task) {
this._task.removeAllListeners();
}
Pooler.prototype.stop = function () {
this.logger.debug(`Pooler (${this.id}): Stop`);

if (this._request) {
this.on('stopPooling', () => {
this.removeAllListeners();
cb();
if (!this._stoppingPromise) {
this._stoppingPromise = (this._requestPromise || Promise.resolve()).then(() => {
this._stopped = true;
});
// This would be better approach but it does not seem to work
// this._request.abort();
} else {
cb();
}

return this._stoppingPromise;
};

/**
Expand All @@ -54,68 +44,74 @@ Pooler.prototype.stop = function (cb) {
*/
Pooler.prototype.report = function () {
return {
workerName: this.workerName,
status: (this._task ? 'Task under going' : (this._running ? 'Waiting for Tasks' : 'Paused')),
task: this._task && this._task.report()
id: this.id,
startTime: this.startTime,
status: (this._stopped ? 'Stopped' : 'Running')
};
};

Pooler.prototype.restart = function () {
this._running = true;
this.pool();
return this.stop().then(() => {
this._stopped = false;
this.getActivityTask();
return Promise.resolve();
});
};

Pooler.prototype.pool = function () {
if (this._running) {
if (this._task) {
throw (new Error('pool should not be called when task on going'));
}

if (this._request) {
throw (new Error('pool should not be called when request on going'));
}
Pooler.prototype.getActivityTask = function () {
// This.logger.info('getActivityTask');

this.getActivityTask();
} else {
this.emit('stopPooling');
// this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn);
if (this._stopped) {
return Promise.reject(new Error(`Pooler (${this.id}) is stopped`));
}
};

Pooler.prototype.getActivityTask = function () {
this.logger.debug(this.workerName + ' getActivityTask ' + this.activityArn);
this._request = this.worker.stepfunction.getActivityTask({
activityArn: this.activityArn,
workerName: this.workerName
}, (err, data) => {
this._request = null;
if (err) {
if (!this._requestPromise) {
this.logger.debug(`Pooler (${this.id}): getActivityTask`);

this._requestPromise = this.worker.stepfunction.getActivityTask({
activityArn: this.activityArn,
workerName: this.workerName
}).promise()
.then(data => {
if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) {
this.logger.debug(`Pooler (${this.id}): Activity task received (${data.taskToken.slice(0, 10)})`);
const params = Object.assign({}, data, {
input: JSON.parse(data.input),
workerName: this.workerName,
poolerId: this.id
});
return this.worker.addTask(params);
}

this.logger.debug(`Pooler (${this.id}): No activity task received`);
return Promise.resolve();
})
.then(() => {
this._requestPromise = null;
const renewal = this.worker.renewPooler(this);
if (!renewal) {
this.stop();
this.worker.removePooler(this);
return Promise.resolve();
}

return this.getActivityTask();
})
.catch(error => {
// Console.log(err);
if (err.code === 'RequestAbortedError') {
this.logger.error(`Pooler (${this.id}):`, error);
if (error.code === 'RequestAbortedError') {
// In case of abort, close silently
} else {
this.emit('error', err);
}

return;
}

if (data.taskToken && typeof (data.taskToken) === 'string' && data.taskToken.length > 1) {
const params = Object.assign({}, data, {input: JSON.parse(data.input), workerName: this.workerName});

this.worker.emit('task', params);

this._task = new Task(Object.assign({}, params, {worker: this.worker, logger: this.logger}));
} else {
this.worker.emit('error', error);
}

this._task.once('finish', () => {
this._task = null;
this.pool();
// Return Promise.reject(err);
});
} else {
this.pool();
}
});
};
}

util.inherits(Pooler, EventEmitter);
return this._requestPromise;
};

module.exports = Pooler;
Loading

0 comments on commit 4ac9fb9

Please sign in to comment.