Skip to content

Commit

Permalink
[CONJS-180] compatibility: support mysql2 stream option #173
Browse files Browse the repository at this point in the history
  • Loading branch information
diego Dupin committed Oct 18, 2021
1 parent 8d11d51 commit d78ba3b
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 6 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# Change Log

## [3.0.1-rc](https://github.com/mariadb-corporation/mariadb-connector-nodejs/tree/3.0.1-rc) (???)
[Full Changelog](https://github.com/mariadb-corporation/mariadb-connector-nodejs/compare/3.0.0-beta...3.0.1-rc)
## [3.0.0-rc](https://github.com/mariadb-corporation/mariadb-connector-nodejs/tree/3.0.0-rc) (19 Oct 2021)
[Full Changelog](https://github.com/mariadb-corporation/mariadb-connector-nodejs/compare/3.0.0-beta...3.0.0-rc)

Notable change:
* [CONJS-168] stream backpressure not handled well
* [CONJS-172] performance improvement for multi-line result-set + update perf result with recent mysql/mysql2 drivers see [dedicated part](https://github.com/mariadb-corporation/mariadb-connector-nodejs/blob/maintenance/3.x/documentation/benchmarks.md) results.
* [CONJS-168] correct stream backpressure
* [CONJS-176] Change Pool cluster default option removeNodeErrorCount value to Infinity
* [CONJS-175] Missing leakDetectionTimeout option in Typescript description

* [CONJS-178] Update code to recent Ecma version
* [CONJS-179] better pool option `resetAfterUse` default value
* [CONJS-180] compatibility: support mysql2 `stream` option
*
* Corrections:
* [CONJS-125] permit using batch with returning clause
* [CONJS-170] Pool.query(undefined) never release connection
Expand Down
38 changes: 38 additions & 0 deletions documentation/connection-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,44 @@ mariadb.createConnection({
| **restrictedAuth** | if set, restrict authentication plugin to secure list. Default provided plugins are mysql_native_password, mysql_clear_password, client_ed25519, dialog, sha256_password and caching_sha2_password |*Array|String* | |
| **supportBigNumbers** | (deprecated) DECIMAL/BIGINT data type will be returned as number if in safe integer range, as string if not.|*boolean* | false |
| **bigNumberStrings** | (deprecated) if set with `supportBigNumbers` DECIMAL/BIGINT data type will be returned as string |*boolean* | false |
| **stream** | permits to set a function with parameter to set stream (since 3.0)|*function*| |

### SSH tunnel

In some cases, server is only available through an SSH tunnel.
(This is of course not a recommended solution for production)


The option `stream` permit defined a tunnel. stream function has callback (optional parameters : error, stream).

Example using `tunnel-ssh`:

```
const conn = await mariadb.createConnection({
user: 'myUser',
password: 'mYpwd',
port: 27000,
stream: (cb) => {
const tunnel = require('tunnel-ssh');
tunnel(
{
// remote connection ssh info
username: 'root',
host: '157.230.123.7',
port: 22,
privateKey: fs.readFileSync('./pop_key.ppk'),
// database (here on ssh server)
dstHost: '127.0.0.1',
dstPort: 3306,
// local interface
localHost: '127.0.0.1',
localPort: 27000
},
cb
);
}
});
```


## F.A.Q.
Expand Down
1 change: 1 addition & 0 deletions lib/config/connection-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ConnectionOptions {
this.user = opts.user || process.env.USERNAME;
this.password = opts.password;
this.database = opts.database;
this.stream = opts.stream;
if (opts.charset && typeof opts.charset === 'string') {
this.collation = Collations.fromCharset(opts.charset.toLowerCase());
if (this.collation === undefined) {
Expand Down
34 changes: 33 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const OkPacket = require('./cmd/class/ok-packet');
const Execute = require('./cmd/execute');
const ClosePrepare = require('./cmd/close-prepare');
const BatchBulk = require('./cmd/batch-bulk');
const Stream = require('./cmd/stream');
const ChangeUser = require('./cmd/change-user');
const { Status } = require('./const/connection_status');

Expand Down Expand Up @@ -773,10 +772,43 @@ class Connection extends EventEmitter {
streamInitSocket(authFailHandler) {
if (this.opts.socketPath) {
this.socket = Net.connect(this.opts.socketPath);
} else if (this.opts.stream) {
if (typeof this.opts.stream === 'function') {
const tmpSocket = this.opts.stream(
function (err, stream) {
if (err) {
authFailHandler(err);
return;
} else if (stream) {
this.socket = stream;
this.socketInit(authFailHandler);
} else {
this.socket = Net.connect(this.opts.port, this.opts.host);
this.socketInit(authFailHandler);
}
}.bind(this)
);
if (tmpSocket) {
this.socket = tmpSocket;
this.socketInit(authFailHandler);
}
} else {
const err = Errors.createError(
'stream option is not a function. stream must be a function with (error, callback) parameter',
Errors.ER_BAD_PARAMETER_VALUE,
this.info
);
authFailHandler(err);
}
return;
} else {
this.socket = Net.connect(this.opts.port, this.opts.host);
}

this.socketInit(authFailHandler);
}

socketInit(authFailHandler) {
if (this.opts.connectTimeout) {
this.timeout = setTimeout(
this.connectTimeoutReached.bind(this),
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@
"denque": "^1.5.0",
"iconv-lite": "^0.6.3",
"moment-timezone": "^0.5.33",
"please-upgrade-node": "^3.2.0",
"promise-mysql": "^5.0.4"
"please-upgrade-node": "^3.2.0"
},
"devDependencies": {
"@typescript-eslint/eslint-plugin": "^5.0.0",
Expand Down
24 changes: 24 additions & 0 deletions test/integration/test-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const Collations = require('../../lib/const/collations.js');
const Conf = require('../conf');
const Connection = require('../../lib/connection');
const ConnOptions = require('../../lib/config/connection-options');
const Net = require('net');

describe('connection', () => {
it('with no connection attributes', function (done) {
Expand Down Expand Up @@ -434,6 +435,29 @@ describe('connection', () => {
});
});

it('stream basic test', async function () {
const conn = await base.createConnection({
stream: (cb) => {
cb(null, new Net.connect(Conf.baseConfig.port, Conf.baseConfig.host));
}
});
conn.end();

const conn2 = await base.createConnection({
stream: () => {
return new Net.connect(Conf.baseConfig.port, Conf.baseConfig.host);
}
});
conn2.end();

const conn3 = await base.createConnection({
stream: (cb) => {
cb();
}
});
conn3.end();
});

it('connection error', function (done) {
base
.createConnection({
Expand Down
12 changes: 12 additions & 0 deletions test/integration/test-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ describe('Error', () => {
.catch(done);
});

it('stream type error', async function () {
try {
await base.createConnection({ stream: 'wrong' });
throw new Error('must have thrown error');
} catch (err) {
assert.isTrue(err.message.includes('stream option is not a function'));
assert.equal(err.errno, 45043);
assert.equal(err.sqlState, 'HY000');
assert.equal(err.code, 'ER_BAD_PARAMETER_VALUE');
}
});

it('query callback error with trace', function (done) {
const conn = base.createCallbackConnection({ trace: true });
conn.connect((err1) => {
Expand Down
1 change: 1 addition & 0 deletions test/unit/config/test-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ describe('test options', () => {
insertIdAsNumber: false,
skipSetTimezone: false,
typeCast: undefined,
stream: undefined,
bigIntAsNumber: false,
bulk: true,
permitLocalInfile: false,
Expand Down
10 changes: 10 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export interface LoggerConfig {
error?: (err: Error) => void;
}

export function StreamCallback(err?: Error, stream?: stream.Duplex): void;

export interface QueryConfig {
/**
* Presents result-sets by table to avoid results with colliding fields.
Expand Down Expand Up @@ -344,6 +346,14 @@ export interface ConnectionConfig extends UserConnectionConfig, QueryConfig {
* default to 256.
*/
prepareCacheLength?: number;

/**
* Permit to set stream.
*
* @param err error is any error occurs during stream creation
* @param stream if wanting to set a special stream (Standard socket will be created if not set)
*/
stream?: (callback?: typeof StreamCallback) => void;
}

export interface PoolConfig extends ConnectionConfig {
Expand Down
4 changes: 4 additions & 0 deletions types/mariadb-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ function createConnection(option?: ConnectionConfig): Promise<mariadb.Connection
network: (msg) => console.log(msg),
query: (msg) => console.log(msg),
error: (err) => console.log(err)
},
stream: (callback) => {
console.log('test');
callback(null, null);
}
});
}
Expand Down

0 comments on commit d78ba3b

Please sign in to comment.