Skip to content

Commit

Permalink
Merge pull request #11 from piercus/issue-10
Browse files Browse the repository at this point in the history
Issue 10
  • Loading branch information
piercus authored Dec 8, 2018
2 parents bffbd6e + dc73373 commit 7a6b399
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 67 deletions.
8 changes: 7 additions & 1 deletion lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ Worker.prototype.close = function (cb) {
};

Worker.prototype.execute = function (input, cb, heartbeat) {
this.fn(input, cb, heartbeat);
setImmediate(() => {
try {
this.fn(input, cb, heartbeat);
} catch (err) {
cb(err);
}
});
};

Worker.prototype.succeed = function (res) {
Expand Down
111 changes: 45 additions & 66 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 123 additions & 0 deletions test/scenarios/sync-fn.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
const test = require('ava').test;
const AWS = require('aws-sdk');
const StepFunctionWorker = require('../../index.js');
const createActivity = require('../utils/create-activity');
const cleanUp = require('../utils/clean-up');

const stepFunction = new AWS.StepFunctions();
const workerName = 'test worker name';
const stateMachineName = 'test-state-machine-' + Math.floor(Math.random() * 1000);
const activityName = 'test-step-function-worker-' + Math.floor(Math.random() * 1000);

process.on('uncaughtException', err => {
console.log('uncaughtException', err);
});
/*
{
definition: '{"Comment":"An Example State machine using Activity.","StartAt":"FirstState","States":{"FirstState":{"Type":"Task","Resource":"arn:aws:states:eu-central-1:170670752151:activity:test-step-function-worker","TimeoutSeconds":300,"HeartbeatSeconds":60,"Next":"End"}}}',
name: 'test-state-machine',
roleArn: 'arn:aws:iam::170670752151:role/service-role/StatesExecutionRole-eu-central-1'
}
*/

const context = {};

const before = createActivity.bind(null, {context, activityName, stateMachineName, workerName});
const after = cleanUp.bind(null, context);

const sentInput = {foo: 'bar'};
const sentOutput = {foo2: 'bar2'};

const fn = function (event, callback) {
callback(null, sentOutput);
};

const fnError = function () {
throw (new Error('custom error'));
};

test.before(before);

test.serial('Step function Activity Worker with 2 consecutive synchronous tasks', t => {
const activityArn = context.activityArn;
const stateMachineArn = context.stateMachineArn;

const worker = new StepFunctionWorker({
activityArn,
workerName: workerName + '-fn',
fn
});

return new Promise((resolve, reject) => {
let expectedTaskToken;
const params = {
stateMachineArn,
input: JSON.stringify(sentInput)
};
worker.once('task', task => {
// Task.taskToken
// task.input
t.deepEqual(task.input, sentInput);
t.is(typeof (task.taskToken), 'string');
expectedTaskToken = task.taskToken;
});
worker.on('error', reject);
worker.once('success', out => {
t.is(out.taskToken, expectedTaskToken);

let expectedTaskToken2;
worker.once('task', task => {
// Task.taskToken
// task.input
expectedTaskToken2 = task.taskToken;
});

worker.once('success', out => {
t.is(out.taskToken, expectedTaskToken2);
worker.close(() => {
resolve();
});
});

stepFunction.startExecution(params).promise();
});

stepFunction.startExecution(params).promise();
});
});

test.serial('Step function Activity Worker with synchronous failing task', t => {
const activityArn = context.activityArn;
const stateMachineArn = context.stateMachineArn;

const worker = new StepFunctionWorker({
activityArn,
workerName: workerName + '-fn',
fn: fnError
});

return new Promise((resolve, reject) => {
let expectedTaskToken;
const params = {
stateMachineArn,
input: JSON.stringify(sentInput)
};
worker.once('task', task => {
// Task.taskToken
// task.input
t.deepEqual(task.input, sentInput);
t.is(typeof (task.taskToken), 'string');
expectedTaskToken = task.taskToken;
});
worker.once('failure', out => {
t.is(out.taskToken, expectedTaskToken);
t.is(out.error.message, 'custom error');
worker.close(() => {
resolve();
});
});
worker.once('success', reject);
stepFunction.startExecution(params).promise();
});
});
test.after(after);

0 comments on commit 7a6b399

Please sign in to comment.