Skip to content

Commit

Permalink
Merge pull request #38 from trs/improve-connector-stream-handling
Browse files Browse the repository at this point in the history
Improve connector stream handling
  • Loading branch information
trs authored Aug 18, 2017
2 parents ed086e5 + e555ce9 commit 283be85
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
15 changes: 10 additions & 5 deletions src/commands/registration/retr.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@ module.exports = {
.then(() => when.try(this.fs.read.bind(this.fs), command.arg, {start: this.restByteCount}))
.then(stream => {
this.restByteCount = 0;
return when.promise((resolve, reject) => {
this.connector.socket.on('error', err => stream.emit('error', err));

const eventsPromise = when.promise((resolve, reject) => {
this.connector.socket.once('error', err => reject(err));

stream.on('data', data => this.connector.socket.write(data, this.transferType));
stream.on('end', () => resolve(this.reply(226)));
stream.on('error', err => reject(err));
this.reply(150).then(() => this.connector.socket.resume());
stream.once('error', err => reject(err));
stream.once('end', () => resolve());
});

return this.reply(150).then(() => this.connector.socket.resume())
.then(() => eventsPromise)
.finally(() => stream.destroy ? stream.destroy() : null);
})
.then(() => this.reply(226))
.catch(when.TimeoutError, err => {
log.error(err);
return this.reply(425, 'No connection established');
Expand Down
27 changes: 17 additions & 10 deletions src/commands/registration/stor.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@ module.exports = {
.then(() => when.try(this.fs.write.bind(this.fs), fileName, {append, start: this.restByteCount}))
.then(stream => {
this.restByteCount = 0;
return when.promise((resolve, reject) => {
stream.once('error', err => this.connector.socket.emit('error', err));
stream.once('finish', () => resolve(this.reply(226, fileName)));

// Emit `close` if stream has a close listener, otherwise emit `finish` with the end() method
// It is assumed that the `close` handler will call the end() method
this.connector.socket.once('end', () => stream.listenerCount('close') ? stream.emit('close') : stream.end());
this.connector.socket.once('error', err => reject(err));
const streamPromise = when.promise((resolve, reject) => {
stream.once('error', err => reject(err));
stream.once('finish', () => resolve());
});

const socketPromise = when.promise((resolve, reject) => {
this.connector.socket.on('data', data => stream.write(data, this.transferType));
this.connector.socket.once('end', () => {
if (stream.listenerCount('close')) stream.emit('close');
else stream.end();
resolve();
});
this.connector.socket.once('error', err => reject(err));
});

this.reply(150).then(() => this.connector.socket.resume());
})
.finally(() => stream.destroy ? when.try(stream.destroy.bind(stream)) : null);
return this.reply(150).then(() => this.connector.socket.resume())
.then(() => when.join(streamPromise, socketPromise))
.finally(() => stream.destroy ? stream.destroy() : null);
})
.then(() => this.reply(226, fileName))
.catch(when.TimeoutError, err => {
log.error(err);
return this.reply(425, 'No connection established');
Expand Down
27 changes: 27 additions & 0 deletions test/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint no-unused-expressions: 0 */
const {expect} = require('chai');
const sinon = require('sinon');
const bunyan = require('bunyan');
const fs = require('fs');

Expand All @@ -10,10 +11,13 @@ before(() => require('dotenv').load());

describe('FtpServer', function () {
this.timeout(2000);
let sandbox;
let log = bunyan.createLogger({name: 'test'});
let server;
let client;

let connection;

before(() => {
server = new FtpServer(process.env.FTP_URL, {
log,
Expand All @@ -26,11 +30,18 @@ describe('FtpServer', function () {
greeting: ['hello', 'world']
});
server.on('login', (data, resolve) => {
connection = data.connection;
resolve({root: process.cwd()});
});

return server.listen();
});
beforeEach(() => {
sandbox = sinon.sandbox.create();
});
afterEach(() => {
sandbox.restore();
});
after(() => {
server.close();
});
Expand Down Expand Up @@ -109,6 +120,22 @@ describe('FtpServer', function () {
});
});

it('STOR fail.txt', done => {
sandbox.stub(connection.fs, 'write').callsFake(function () {
const fsPath = './test/fail.txt';
const stream = require('fs').createWriteStream(fsPath, {flags: 'w+'});
stream.once('error', () => fs.unlink(fsPath));
setTimeout(() => stream.emit('error', new Error('STOR fail test'), 1));
return stream;
});
const buffer = Buffer.from('test text file');
client.put(buffer, 'fail.txt', err => {
expect(err).to.exist;
expect(fs.existsSync('./test/fail.txt')).to.equal(false);
done();
});
});

it('STOR tést.txt', done => {
const buffer = Buffer.from('test text file');
client.put(buffer, 'tést.txt', err => {
Expand Down

0 comments on commit 283be85

Please sign in to comment.