Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "stack overflow" issue when processing very large input buffers #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 71 additions & 70 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,79 +105,80 @@ LDJSONStream.prototype._reset = function _reset() {

// read up to the next newline
LDJSONStream.prototype._parseDocs = function _parseDocs(cb) {
if (this._debug) { console.log('_parseDocs'); }

if (this._maxDocs && this.docsRead >= this._maxDocs) { cb(); return; }

// move pointer to first newline character
var found = false;
while (!found && this._docptr < this.buffer.length) {
if (~[0x0a, 0x0d].indexOf(this.buffer[this._docptr])) {
found = true;
for(;;) {
if (this._debug) { console.log('_parseDocs'); }

if (this._maxDocs && this.docsRead >= this._maxDocs) { cb(); return; }

// move pointer to first newline character
var found = false;
while (!found && this._docptr < this.buffer.length) {
if (~[0x0a, 0x0d].indexOf(this.buffer[this._docptr])) {
found = true;
}
this._docptr++;
}
this._docptr++;
}

// if a newline is found, check if it's a carriage return followed by a newline
var crnl = false;
if (found && this._docptr < this.buffer.length && this.buffer[this._docptr] === 0x0d && this.buffer[this._docptr + 1] === 0x0a) {
this._docptr++;
crnl = true;
}

// enforce max doc length
if (this._docptr - (crnl ? 2 : 1) > this._maxDocLength) {
// discard buffer
this._reset();
cb(new Error('document exceeds configured maximum length'));
return;
}

if (!found) {
// wait for more chunks
cb();
return;
}

// since a newline is found, try to read and parse it as JSON

var rawdoc = this.buffer.slice(0, this._docptr);
var obj;

try {
if (this._debug) { console.log('parse', rawdoc.toString()); }
obj = JSON.parse(rawdoc);
} catch (err) {
if (this._debug) { console.error(err, rawdoc); }

// support multi-line JSON
if (err.message === 'Unexpected end of JSON input') {
// look for next newline
this._parseDocs(cb);
} else {

// if a newline is found, check if it's a carriage return followed by a newline
var crnl = false;
if (found && this._docptr < this.buffer.length && this.buffer[this._docptr] === 0x0d && this.buffer[this._docptr + 1] === 0x0a) {
this._docptr++;
crnl = true;
}

// enforce max doc length
if (this._docptr - (crnl ? 2 : 1) > this._maxDocLength) {
// discard buffer
this._reset();
cb(err);
cb(new Error('document exceeds configured maximum length'));
return;
}

if (!found) {
// wait for more chunks
cb();
return;
}

// since a newline is found, try to read and parse it as JSON

var rawdoc = this.buffer.slice(0, this._docptr);
var obj;

try {
if (this._debug) { console.log('parse', rawdoc.toString()); }
obj = JSON.parse(rawdoc);
} catch (err) {
if (this._debug) { console.error(err, rawdoc); }

// support multi-line JSON
if (err.message === 'Unexpected end of JSON input') {
// look for next newline
this._parseDocs(cb);
} else {
this._reset();
cb(err);
}
return;
}

// shift document from internal buffer and nullify expected document length
this.buffer = this.buffer.slice(this._docptr);
this._docptr = 0;

// push the raw or parsed doc out to the reader
if (this._objectMode) {
this.push(obj);
} else {
this.push(rawdoc);
}
this.docsRead++;

// check if there might be any new document that can be parsed
if (!this.buffer.length) {
cb();
return;
}
return;
}

// shift document from internal buffer and nullify expected document length
this.buffer = this.buffer.slice(this._docptr);
this._docptr = 0;

// push the raw or parsed doc out to the reader
if (this._objectMode) {
this.push(obj);
} else {
this.push(rawdoc);
}
this.docsRead++;

// check if there might be any new document that can be parsed
if (this.buffer.length) {
this._parseDocs(cb);
} else {
cb();
}
};

Expand Down
76 changes: 24 additions & 52 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ var tasks = [];

/* throw */

assert.throws(function() { var ls = new LDJSONStream(''); return ls; }, 'opts must be an object');
assert.throws(function() { var ls = new LDJSONStream({ maxDocLength: '' }); return ls; }, 'should require opts.maxDocLength to be a number');
assert.throws(function() { var ls = new LDJSONStream({ maxDocs: '' }); return ls; }, 'opts.maxDocs must be a number');
assert.throws(function() { var ls = new LDJSONStream({ maxBytes: '' }); return ls; }, 'opts.maxBytes must be a number');
assert.throws(function() { var ls = new LDJSONStream({ debug: '' }); return ls; }, 'opts.debug must be a boolean');
assert.throws(function() { var ls = new LDJSONStream({ hide: '' }); return ls; }, 'opts.hide must be a boolean');
assert.throws(function() { var ls = new LDJSONStream(''); return ls; }, null, 'opts must be an object');
assert.throws(function() { var ls = new LDJSONStream({ maxDocLength: '' }); return ls; }, null, 'should require opts.maxDocLength to be a number');
assert.throws(function() { var ls = new LDJSONStream({ maxDocs: '' }); return ls; }, null, 'opts.maxDocs must be a number');
assert.throws(function() { var ls = new LDJSONStream({ maxBytes: '' }); return ls; }, null, 'opts.maxBytes must be a number');
assert.throws(function() { var ls = new LDJSONStream({ debug: '' }); return ls; }, null, 'opts.debug must be a boolean');
assert.throws(function() { var ls = new LDJSONStream({ hide: '' }); return ls; }, null, 'opts.hide must be a boolean');

/* async */

Expand Down Expand Up @@ -163,13 +163,17 @@ tasks.push(function(done) {

/* should err when only a newline is written */
tasks.push(function(done) {
function doneOnce() {
if(done) done()
done = null
}
var ls = new LDJSONStream();
ls.on('data', function() { throw Error('incomplete object emitted'); });
ls.on('error', function(err) {
assert.strictEqual(err.message, 'Unexpected end of JSON input');
done();
doneOnce();
});
ls.on('close', done);
ls.on('close', doneOnce);
ls.end('\r\n');
});

Expand Down Expand Up @@ -245,10 +249,8 @@ tasks.push(function(done) {
ls.end(JSON.stringify(obj1) + '\r\n' + JSON.stringify(obj2));
});

/* should skip noise in previous chunks and emit two generated JSON objects */
/* should not flush if noflush is set */
tasks.push(function(done) {
var noise = '289,df';

var obj1 = {
foo: 'bar'
};
Expand All @@ -260,68 +262,38 @@ tasks.push(function(done) {
qux: null
};

var ls = new LDJSONStream({ objectMode: true });
var ls = new LDJSONStream({ flush: false, objectMode: true });

var arr = [];

ls.on('error', function(err) {
assert.strictEqual(err.message, 'Unexpected token , in JSON at position 3');
});

ls.on('data', function(data) {
arr.push(data);
});

ls.on('end', function() {
assert.strictEqual(arr.length, 2);
assert.strictEqual(arr.length, 1);
assert.deepEqual(arr[0], {
foo: 'bar'
});
assert.deepEqual(arr[1], {
foo: 'baz',
bar: 42,
baz: false,
qux: null
});
done();
});

ls.write(noise + '\n');
ls.write(JSON.stringify(obj1) + '\n' + noise + '\n' + JSON.stringify(obj2));
ls.write('\n' + JSON.stringify(obj2));
ls.end();
ls.end(JSON.stringify(obj1) + '\r\n' + JSON.stringify(obj2));
});

/* should not flush if noflush is set */
/* should handle large input buffers */
tasks.push(function(done) {
var obj1 = {
foo: 'bar'
};

var obj2 = {
foo: 'baz',
bar: 42,
baz: false,
qux: null
};

var ls = new LDJSONStream({ flush: false, objectMode: true });

var arr = [];

ls.on('data', function(data) {
arr.push(data);
var ls = new LDJSONStream({ objectMode: true });
var i = 0;
ls.on('data', function(obj) {
i++;
assert.deepEqual(obj, {});
});

ls.on('end', function() {
assert.strictEqual(arr.length, 1);
assert.deepEqual(arr[0], {
foo: 'bar'
});
assert.strictEqual(i, 10000);
done();
});

ls.end(JSON.stringify(obj1) + '\r\n' + JSON.stringify(obj2));
ls.end('{}\n'.repeat(10000));
});

async.series(tasks, function(err) {
Expand Down