From e9e0b72d3ecad689515fb3ebc5def73632f1535b Mon Sep 17 00:00:00 2001 From: Daniel Mickens Date: Tue, 30 Jan 2024 09:17:25 -0500 Subject: [PATCH] Support for copy local stdin (#130) * New feature: Support for copy from local stdin * Support for glob patterns in copy from local file --- packages/v-protocol/src/backend-messages.ts | 7 +- packages/v-protocol/src/parser.ts | 10 +- packages/vertica-nodejs/README.md | 41 +++++++ packages/vertica-nodejs/lib/client.js | 20 ++-- packages/vertica-nodejs/lib/connection.js | 18 ++- packages/vertica-nodejs/lib/query.js | 47 ++++++-- ...copy-tests.js => copy-local-file-tests.js} | 51 ++++---- .../client/copy-local-stdin-tests.js | 113 ++++++++++++++++++ 8 files changed, 258 insertions(+), 49 deletions(-) rename packages/vertica-nodejs/mochatest/integration/client/{copy-tests.js => copy-local-file-tests.js} (81%) create mode 100644 packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js diff --git a/packages/v-protocol/src/backend-messages.ts b/packages/v-protocol/src/backend-messages.ts index 2c027bdd..57ee286d 100644 --- a/packages/v-protocol/src/backend-messages.ts +++ b/packages/v-protocol/src/backend-messages.ts @@ -269,14 +269,15 @@ export class NoticeMessage implements BackendMessage, NoticeOrError { export class VerifyFilesMessage { public readonly name: MessageName = 'verifyFiles' - public readonly fileNames: string[] + public readonly fileNames: string[] | null constructor(public readonly length: number, public numFiles: number, - public files: string[], + public files: string[] | null, public readonly rejectFile: string, public readonly exceptionFile: string) { - this.fileNames = [...files] // shallow copy + // shallow copy the fileNames, or null for copy from local stdin + this.fileNames = files !== null ? [...files] : null } } diff --git a/packages/v-protocol/src/parser.ts b/packages/v-protocol/src/parser.ts index f894b480..098d0ca2 100644 --- a/packages/v-protocol/src/parser.ts +++ b/packages/v-protocol/src/parser.ts @@ -243,10 +243,14 @@ export class Parser { private parseVerifyFilesMessage(offset: number, length: number, bytes: Buffer) { this.reader.setBuffer(offset, bytes) const numFiles = this.reader.int16() //int16 number of files, n - const fileNames: string[] = new Array(numFiles) - for (let i = 0; i < numFiles; i++) { - fileNames[i] = this.reader.cstring() //string[n], name of each file + let fileNames: string[] | null = null; + if (numFiles !== 0) { + fileNames = new Array(numFiles); + for (let i = 0; i < numFiles; i++) { + fileNames[i] = this.reader.cstring(); // string[n], name of each file + } } + const rejectFile = this.reader.cstring() //string reject file name const exceptionFile = this.reader.cstring() //string exceptions file name return new VerifyFilesMessage(length, numFiles, fileNames, rejectFile, exceptionFile) diff --git a/packages/vertica-nodejs/README.md b/packages/vertica-nodejs/README.md index f4f6849c..e4283455 100644 --- a/packages/vertica-nodejs/README.md +++ b/packages/vertica-nodejs/README.md @@ -329,6 +329,47 @@ Prepared statements are slightly different. Here we will provide a query in the }) ``` +### Copy Local Commands + +Copy Local commands allow you to quickly load data from a client system to your vertica database. There are two type of copy local commands, copy from local file and copy from local stdin. If REJECTED DATA or EXCEPTIONS are specified in the copy command, the provided file paths must be writable by the process running the driver. If the files exist, the driver will append to the end of them. If the files don't exist, the driver will create them first. If RETURNREJECTED is specified in place of REJECTED DATA, the rejected rows can be retrieved from the result object with result.getRejectedRows(). + +#### Copy From Local File + +Copy from local file opens and reads the file(s) from the client system and sends the data in chunks of 64Kb to the server for insertion. The files must be readable by the process running the driver. + +```javascript + const {Client} = require('vertica-nodejs') + const client = new Client() + + client.connect() + client.query("CREATE LOCAL TEMP TABLE myTable(x int)", (err) => { + if (err) console.log(err) + client.query("COPY myTable FROM LOCAL 'ints.dat' REJECTED DATA 'rejects.txt' EXCEPTIONS 'exceptions.txt'", (err, res) => { + console.log(err || res) + client.end() + }) + }) +``` + +#### Copy From Local Stdin (stream) + +Copy from local stdin in vertica-nodejs can be better described as copy from local stream. The driver supports inserting any stream of data that is an instance of `stream.Readable``. Binary and utf-8 encoded streams are supported. Since the query syntax does not specify where to access the stream in the same way that copy from local file specifies the location of the file, an additional parameter must be provided in a config object, copyStream. + +```javascript + const {Client} = require('vertica-nodejs') + const client = new Client() + + client.connect() + const readableStream = fs.createReadStream(filePath) // assumes filePath is a string containing the path to a data file + client.query("CREATE LOCAL TEMP TABLE myTable(x int)", (err) => { + if (err) console.log(err) + client.query({text: "COPY myTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { + console.log(err || res.getRejectedRows()) + client.end() + }) + }) +``` + ### Modifying Result Rows with RowMode The Result.rows returned by a query are by default an array of objects with key-value pairs that map the column name to column value for each row. Often you will find you don't need that, especially for very large result sets. In this case you can provide a query object parameter containing the rowMode field set to 'array'. This will cause the driver to parse row data into arrays of values without creating an object and having key-value pairs. diff --git a/packages/vertica-nodejs/lib/client.js b/packages/vertica-nodejs/lib/client.js index 1bdc3510..bf3ced9d 100644 --- a/packages/vertica-nodejs/lib/client.js +++ b/packages/vertica-nodejs/lib/client.js @@ -485,8 +485,6 @@ class Client extends EventEmitter { } _handleLoadFile(msg) { - // initiate copy data message transfer. - // What determines the size sent to the server in each message? this.activeQuery.handleLoadFile(msg, this.connection) } @@ -499,7 +497,7 @@ class Client extends EventEmitter { } _handleEndOfBatchResponse() { - //noop + this.activeQuery.handleEndOfBatchResponse(this.connection) } _handleNotice(msg) { @@ -619,23 +617,25 @@ class Client extends EventEmitter { } } + // todo - refactor to improve readibility. Move out logic for identifying parameter types to helper function if possible query(config, values, callback) { // can take in strings, config object or query object - var query - var result - var readTimeout - var readTimeoutTimer - var queryCallback + let query + let result + let readTimeout + let readTimeoutTimer + let queryCallback if (config === null || config === undefined) { throw new TypeError('Client was passed a null or undefined query') - } else if (typeof config.submit === 'function') { + } + if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config if (typeof values === 'function') { query.callback = query.callback || values } - } else { + } else { // config is a string readTimeout = this.connectionParameters.query_timeout query = new Query(config, values, callback) if (!query.callback) { diff --git a/packages/vertica-nodejs/lib/connection.js b/packages/vertica-nodejs/lib/connection.js index 5043613e..6f3a10b3 100644 --- a/packages/vertica-nodejs/lib/connection.js +++ b/packages/vertica-nodejs/lib/connection.js @@ -314,7 +314,23 @@ class Connection extends EventEmitter { this._send(serialize.EndOfBatchRequest()) } - sendCopyDataStream(msg) { + sendCopyDataStream(copyStream) { + copyStream.on('readable', () => { + let bytesRead + while ((bytesRead = copyStream.read(bufferSize)) !== null) { + if (Buffer.isBuffer(bytesRead)) { // readableStream is binary + this.sendCopyData(bytesRead) + } else { // readableStream is utf-8 encoded + this.sendCopyData(Buffer.from(bytesRead, 'utf-8')) + } + } + }) + copyStream.on('end', () => { + this.sendEndOfBatchRequest() + }) + } + + sendCopyDataFiles(msg) { const buffer = Buffer.alloc(bufferSize); const fd = fs.openSync(msg.fileName, 'r'); let bytesRead = 0; diff --git a/packages/vertica-nodejs/lib/query.js b/packages/vertica-nodejs/lib/query.js index 60d131d1..5d7171b3 100644 --- a/packages/vertica-nodejs/lib/query.js +++ b/packages/vertica-nodejs/lib/query.js @@ -20,19 +20,21 @@ const Result = require('./result') const utils = require('./utils') const fs = require('fs') const fsPromises = require('fs').promises +const stream = require('stream') +const glob = require('glob') class Query extends EventEmitter { constructor(config, values, callback) { super() config = utils.normalizeQueryConfig(config, values, callback) - this.text = config.text this.values = config.values this.rows = config.rows this.types = config.types this.name = config.name this.binary = config.binary || false + this.copyStream = config.copyStream || null // use unique portal name each time this.portal = config.portal || '' this.callback = config.callback @@ -195,6 +197,13 @@ class Query extends EventEmitter { //do nothing, vertica doesn't support result-row count limit } + handleEndOfBatchResponse(connection) { + if (this.copyStream) { //copy from stdin + connection.sendCopyDone() + } + // else noop, backend will send CopyDoneResponse for copy from local file to continue the process + } + prepare(connection) { // prepared statements need sync to be called after each command // complete or when an error is encountered @@ -249,16 +258,36 @@ class Query extends EventEmitter { } handleCopyInResponse(connection) { - connection.sendCopyFail('No source stream defined') + connection.sendCopyDataStream(this.copyStream) } async handleVerifyFiles(msg, connection) { - try { // Check if the data file can be read - await fsPromises.access(msg.files[0], fs.constants.R_OK); - } catch (readInputFileErr) { // Can't open input file for reading, send CopyError - console.log(readInputFileErr.code) - connection.sendCopyError(msg.files[0], 0, '', "Unable to open input file for reading") - return; + if (msg.numFiles !== 0) { // we are copying from file, not stdin + let expandedFileNames = [] + for (const fileName of msg.files) { + if (/[*?[\]]/.test(fileName)) { // contains glob pattern + const matchingFiles = glob.sync(fileName) + expandedFileNames = expandedFileNames.concat(matchingFiles) + } else { + expandedFileNames.push(fileName) + } + } + const uniqueFileNames = [...new Set(expandedFileNames)] // remove duplicates + msg.numFiles = uniqueFileNames.length + msg.fileNames = uniqueFileNames + for (const fileName of uniqueFileNames) { + try { // Check if the data file can be read + await fsPromises.access(fileName, fs.constants.R_OK); + } catch (readInputFileErr) { // Can't open input file for reading, send CopyError + connection.sendCopyError(fileName, 0, '', "Unable to open input file for reading") + return; + } + } + } else { // check to make sure the readableStream is in fact a readableStream + if (!(this.copyStream instanceof stream.Readable)) { + connection.sendCopyError(this.copyStream, 0, '', "Cannot perform copy operation. Stream must be an instance of stream.Readable") + return + } } if (msg.rejectFile) { try { // Check if the rejections file can be written to, if specified @@ -300,7 +329,7 @@ class Query extends EventEmitter { } handleLoadFile(msg, connection) { - connection.sendCopyDataStream(msg) + connection.sendCopyDataFiles(msg) } handleWriteFile(msg, connection) { diff --git a/packages/vertica-nodejs/mochatest/integration/client/copy-tests.js b/packages/vertica-nodejs/mochatest/integration/client/copy-local-file-tests.js similarity index 81% rename from packages/vertica-nodejs/mochatest/integration/client/copy-tests.js rename to packages/vertica-nodejs/mochatest/integration/client/copy-local-file-tests.js index 27667776..67a118aa 100644 --- a/packages/vertica-nodejs/mochatest/integration/client/copy-tests.js +++ b/packages/vertica-nodejs/mochatest/integration/client/copy-local-file-tests.js @@ -4,22 +4,22 @@ const assert = require('assert') const path = require('path') const fs = require('fs') -describe('Running Copy Commands', function () { +describe('Running Copy From Local File Commands', function () { // global pool to use for queries const pool = new vertica.Pool() // global file names and paths - const goodFileName = "copy-good.dat" - const badFileName = "copy-bad.dat" - const goodFilePath = path.join(process.cwd(), goodFileName); - const badFilePath = path.join(process.cwd(), badFileName) - const goodFileContents = "1|'a'\n2|'b'\n3|'c'\n4|'d'\n5|'e'" // 5 correctly formatted rows - const badFileContents = "1|'a'\n'b'|2\n3|'c'\n'd'|4\n5|'e'" // rows 2 and 4 malformed + const copyGoodName = "copy-good.dat" + const copyBadName = "copy-bad.dat" + const copyGoodPath = path.join(process.cwd(), copyGoodName); + const copyBadPath = path.join(process.cwd(), copyBadName) + const goodFileContents = "1|a\n2|b\n3|c\n4|d\n5|e\n" // 5 correctly formatted rows + const badFileContents = "6|f\ng|7\n8|h\ni|9\n10|j\n" // rows 2 and 4 malformed // generate temporary test files, create table before tests begin before((done) => { - fs.writeFile(goodFilePath, goodFileContents, () => { - fs.writeFile(badFilePath, badFileContents, () => { + fs.writeFile(copyGoodPath, goodFileContents, () => { + fs.writeFile(copyBadPath, badFileContents, () => { pool.query("CREATE TABLE copyTable (num int, let char)", (done)) }) }) @@ -27,8 +27,8 @@ describe('Running Copy Commands', function () { // delete temporary test files, drop table after tests are complete after((done) => { - fs.unlink(goodFilePath, () => { - fs.unlink(badFilePath, () => { + fs.unlink(copyGoodPath, () => { + fs.unlink(copyBadPath, () => { pool.query("DROP TABLE IF EXISTS copyTable", () => { pool.end(done) }) @@ -66,7 +66,7 @@ describe('Running Copy Commands', function () { assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents fs.readFile('rejects.txt', 'utf8', (err, data) => { assert.equal(err, undefined) - assert.equal(data, "'b'|2\n'd'|4\n") // rows 2 and 4 are malformed + assert.equal(data, "g|7\ni|9\n") // rows 2 and 4 are malformed }) } finally { fs.unlink('rejects.txt', done) @@ -185,20 +185,25 @@ describe('Running Copy Commands', function () { }); }) - it ('behaves properly with ABORT ON ERROR', function(done) { - done() - }) - - it('succeeds using glob patterns', function(done) { - done() - }) it('succeeds with multiple input files', function(done) { - done() + pool.query("COPY copyTable FROM LOCAL 'copy-good.dat', 'copy-bad.dat' RETURNREJECTED", (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 8) // 5 good rows in goodFileContents + assert.deepEqual(res.getRejectedRows(), [7, 9]) + done() + }) }) - it('succeeds with basic copy from stdin command', function(done) { - //todo - done() + it('succeeds using glob patterns', function(done) { + pool.query("COPY copyTable FROM LOCAL 'copy-*.dat' RETURNREJECTED", (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 8) // 5 good rows in goodFileContents + assert.equal(res.getRejectedRows().length, 2) // check the length instead of position in case the order of files loaded changes + pool.query({text: "SELECT num FROM copyTable ORDER BY num ASC", rowMode: 'array'}, (err, res) => { + assert.deepEqual(res.rows, [[1],[2],[3],[4],[5],[6],[8],[10]]) // 7 and 9 malformed. + done() + }) + }) }) }) diff --git a/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js b/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js new file mode 100644 index 00000000..528008b9 --- /dev/null +++ b/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js @@ -0,0 +1,113 @@ +'use strict' +const vertica = require('../../../lib') +const assert = require('assert') +const path = require('path') +const fs = require('fs') + +describe('Running Copy From Local Stdin Commands', function () { + // global pool to use for queries + const pool = new vertica.Pool() + + // global file names and paths + const copyGoodName = "copy-good.dat" + const copyBadName = "copy-bad.dat" + const copyGoodPath = path.join(process.cwd(), copyGoodName); + const copyBadPath = path.join(process.cwd(), copyBadName) + const goodFileContents = "1|'a'\n2|'b'\n3|'c'\n4|'d'\n5|'e'" // 5 correctly formatted rows + const badFileContents = "1|'a'\n'b'|2\n3|'c'\n'd'|4\n5|'e'" // rows 2 and 4 malformed + + // generate temporary test files, create table before tests begin + before((done) => { + fs.writeFile(copyGoodPath, goodFileContents, () => { + fs.writeFile(copyBadPath, badFileContents, () => { + pool.query("CREATE TABLE copyTable (num int, let char)", (done)) + }) + }) + }) + + // delete temporary test files, drop table after tests are complete + after((done) => { + fs.unlink(copyGoodPath, () => { + fs.unlink(copyBadPath, () => { + pool.query("DROP TABLE IF EXISTS copyTable", () => { + pool.end(done) + }) + }) + }) + }) + + // remove data from table between tests + afterEach((done) => { + pool.query("DELETE FROM copyTable", (done)) + }) + + it ('succeeds with basic copy from stdin command', function(done) { + const readableStream = fs.createReadStream(copyGoodPath, { encoding: 'utf8' }) + readableStream.on('open', () => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 5) + done() + }) + }) + }) + + it ('succeeds with a binary input stream', function(done) { + const readableStream = fs.createReadStream(copyGoodPath) + readableStream.on('open', () => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 5) + done() + }) + }) + }) + + it('succeeds when streamed data is larger than buffer size requiring multiple copyData messages', function(done) { + const largeFilePath = path.join(process.cwd(), "large-copy.dat") + const writableStream = fs.createWriteStream(largeFilePath, { encoding: 'utf8' }); + const bytesPerLine = 6 // single quote + letter + single quote + bar + integer + newline = 6 bytes + const desiredFileSize = 66000 // 65536 is our max buffer size. This will force multiple copyData messages + const requiredLines = desiredFileSize / bytesPerLine + + for (let i = 1; i <= requiredLines; i++) { + const char = String.fromCharCode('a'.charCodeAt(0) + (i % 26)); // a - z + const line = `${i}|'${char}'\n` + writableStream.write(line) + } + writableStream.end(() => { + const readableStream = fs.createReadStream(largeFilePath, { encoding: 'utf8' }) + readableStream.on('open', () => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { + try { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], requiredLines) + assert.deepEqual(res.getRejectedRows(), []) + } finally { + fs.unlink(largeFilePath, done) + } + }) + }) + }) + }) + + it('returns rejected rows with RETURNREJECTED specified', function(done) { + const readableStream = fs.createReadStream(copyBadPath, { encoding: 'utf8' }) + readableStream.on('open', () => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { + assert.equal(err, undefined) + assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents + assert.deepEqual(res.getRejectedRows(), [2, 4]) // rows 2 and 4 are malformed + }) + done() + }) + }) + + it('behaves properly when input stream does not exist/is invalid', function(done) { + const badStream = null + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: badStream}, (err) => { + assert.ok(err.message.includes("Cannot perform copy operation. Stream must be an instance of stream.Readable")) + done() + }) + }) +})