From a601c00156b0b389ab8be0d2aa68148b3e84311b Mon Sep 17 00:00:00 2001 From: Cody Stoltman Date: Mon, 25 Jan 2016 16:59:27 -0600 Subject: [PATCH 1/2] arrange deps by name --- package.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 88678dd..820a94a 100644 --- a/package.json +++ b/package.json @@ -15,12 +15,12 @@ "url": "git://github.com/balderdashy/sails-postgresql.git" }, "dependencies": { - "pg": "~4.4.0", - "lodash": "~3.10.0", "async": "~1.5.2", + "lodash": "~3.10.0", + "pg": "~4.4.0", + "waterline-cursor": "~0.0.6", "waterline-errors": "~0.10.0", - "waterline-sequel": "~0.5.3", - "waterline-cursor": "~0.0.6" + "waterline-sequel": "~0.5.3" }, "devDependencies": { "mocha": "*", From 76357187bc62afaff981a538e0ebcabfeb943484 Mon Sep 17 00:00:00 2001 From: Cody Stoltman Date: Mon, 25 Jan 2016 16:59:52 -0600 Subject: [PATCH 2/2] update the stream method --- lib/adapter.js | 86 +++++++++++++++++++++++++++++++------------------- package.json | 1 + 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/lib/adapter.js b/lib/adapter.js index 2b40c6f..065dfe1 100644 --- a/lib/adapter.js +++ b/lib/adapter.js @@ -5,6 +5,7 @@ // Dependencies var pg = require('pg'); +var QueryStream = require('pg-query-stream'); var _ = require('lodash'); var url = require('url'); var async = require('async'); @@ -265,12 +266,12 @@ module.exports = (function() { // If the schema name is "public", just finish creating the table if (schemaName == 'public') {return _define();} - // If not, attempt to create the schema first. This will succeed if + // If not, attempt to create the schema first. This will succeed if // the schema already exists. adapter.createSchema(connectionName, table, schemaName, _define); function _define(errCreatingSchema) { - + if (errCreatingSchema) { cb(handleQueryError(errCreatingSchema)); } @@ -333,7 +334,7 @@ module.exports = (function() { throw new Error("No schemaName specified, and could not determined schemaname for table `" + table + "`"); } - // Build Query + // Build Query var query = 'CREATE SCHEMA "' + schemaName + '"'; spawnConnection(connectionName, function (client, cb) { @@ -994,42 +995,61 @@ module.exports = (function() { // Stream one or more models from the collection stream: function(connectionName, table, options, stream) { + spawnConnection(connectionName, function __STREAM__(client, cb) { - var connectionObject = connections[connectionName]; - var collection = connectionObject.collections[table]; + // Grab Connection Schema + var schema = {}; + var connectionObject = connections[connectionName]; + var collection = connectionObject.collections[table]; + var tableName = table; - var client = new pg.Client(connectionObject.config); - client.connect(); + Object.keys(connectionObject.collections).forEach(function(coll) { + schema[coll] = connectionObject.collections[coll].schema; + }); - var schema = {}; + // Build Query + var _schema = connectionObject.schema; + var processor = new Processor(_schema); - Object.keys(connectionObject.collections).forEach(function(coll) { - schema[coll] = connectionObject.collections[coll].schema; - }); + // Mixin WL Next connection overrides to sqlOptions + var overrides = connectionOverrides[connectionName] || {}; + var _options = _.cloneDeep(sqlOptions); + if(hop(overrides, 'wlNext')) { + _options.wlNext = overrides.wlNext; + } - // Build Query - var _schema = collection.schema; - var queryObj = new Query(_schema, schema); - var query =queryObj.find(table, options); + var sequel = new Sequel(_schema, _options); + var _query; - // Run Query - var dbStream = client.query(query.query, query.values); + // Build a query for the specific query strategy + try { + _query = sequel.find(tableName, options); + } catch(e) { + return cb(e); + } - //can stream row results back 1 at a time - dbStream.on('row', function(row) { - stream.write(row); - }); + var streamQuery = new QueryStream(_query.query[0], _query.values[0]); + var dbStream = client.query(streamQuery); + + // Can stream row results back 1 at a time + dbStream.on('data', function(row) { + stream.write(row); + }); + // + dbStream.on('error', function(err) { + stream.end(); // End stream + cb(); + }); + + // fired after last row is emitted + dbStream.on('end', function() { + stream.end(); // End stream + cb(); + }); + + }, cb); - dbStream.on('error', function(err) { - stream.end(); // End stream - client.end(); // Close Connection - }); - //fired after last row is emitted - dbStream.on('end', function() { - stream.end(); // End stream - client.end(); // Close Connection - }); }, // Update one or more models in the collection @@ -1323,15 +1343,15 @@ module.exports = (function() { rule: 'unique' }]; } - } + } - else if (err.code === '3F000') { + else if (err.code === '3F000') { formattedErr = {}; formattedErr.message = 'Attempted to create a table `' + err.table + '` in a schema `' + err.schemaName + '` that does not exist. Either create the schema manually or make sure that the `createSchemas` flag in your connection configuration is not set to `false`.'; delete err.table; delete err.schemaName; formattedErr.originalError = err; - } + } else if (err.type == 'CREATING_SCHEMA') { formattedErr = {}; diff --git a/package.json b/package.json index 820a94a..1c9ec93 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "async": "~1.5.2", "lodash": "~3.10.0", "pg": "~4.4.0", + "pg-query-stream": "^1.0.0", "waterline-cursor": "~0.0.6", "waterline-errors": "~0.10.0", "waterline-sequel": "~0.5.3"