Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for LISTEN / NOTIFY / RAISE events #48

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
node-version: [16.x, 20.x]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/npm-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: actions/setup-node@v3
with:
node-version: 18
Expand All @@ -22,7 +22,7 @@ jobs:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: actions/setup-node@v3
with:
node-version: 18
Expand All @@ -39,7 +39,7 @@ jobs:
# contents: read
# packages: write
# steps:
# - uses: actions/checkout@v3
# - uses: actions/checkout@v4
# - uses: actions/setup-node@v3
# with:
# node-version: 18
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ It supports *splitting* the resultset and *backpressure* (flow control), to allo

It supports *parameterized queries* and *multiple queries*.

It supports [`LISTEN`](https://www.postgresql.org/docs/current/sql-listen.html) / [`NOTIFY`](https://www.postgresql.org/docs/current/sql-notify.html) / [`RAISE`](https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html) events.

## Outputs

The response (rows) is provided in `msg.payload` as an array.
Expand All @@ -19,6 +21,22 @@ See the [underlying documentation](https://node-postgres.com/apis/result) for de

In the case of multiple queries, then `msg.pgsql` is an array.

## Events

If you tick the *Keep listening for notifications* option along with a [`LISTEN`](https://www.postgresql.org/docs/current/sql-listen.html) request (see below),
you will receive a message for each [`NOTIFY`](https://www.postgresql.org/docs/current/sql-notify.html) event,
where `msg.channel` is the name of the channel, `msg.payload` is content of the notification, and `msg.processId` indicates the process ID.

```sql
LISTEN my_topic;
```

This can be especially useful when combined with a [`TRIGGER`](https://www.postgresql.org/docs/15/sql-createtrigger.html) to be informed of database changes such as new insertions.

See the [underlying documentation](https://node-postgres.com/apis/client#notification) for details.

Likewise, you may get notice messages producted by [`RAISE`](https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html) as a `msg.notice` message.

## Inputs

### SQL query template
Expand Down
17 changes: 17 additions & 0 deletions locales/en-US/postgresql.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ <h3>Outputs</h3>
</p>
<p>In the case of multiple queries, then <code>msg.pgsql</code> is an array.</p>

<h3>Events</h3>
<p>
If you tick the <em>Keep listening for notifications</em> option along with a <a href="https://www.postgresql.org/docs/current/sql-listen.html"><code>LISTEN</code></a> request (see below),
you will receive a message for each <a href="https://www.postgresql.org/docs/current/sql-notify.html"><code>NOTIFY</code> event,
where <code>msg.channel</code> is the name of the channel, <code>msg.payload</code> is content of the notification, and <code>msg.processId</code> indicates the process ID.
</p>

<pre>
LISTEN my_topic;
</pre>

This can be especially useful when combined with a <a href="https://www.postgresql.org/docs/15/sql-createtrigger.html"><code>TRIGGER</code></a> to be informed of database changes such as new insertions.

See the <a href="https://node-postgres.com/apis/client#notification">underlying documentation</a> for details.

Likewise, you may get notice messages producted by <a href="https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html"><code>RAISE</code></a> as a <code>msg.notice</code> message.

<h3>Inputs</h3>
<h4>SQL query template</h4>
<p>This node uses the <a href="https://github.com/janl/mustache.js">Mustache template system</a> to generate queries based on the message:</p>
Expand Down
1 change: 1 addition & 0 deletions locales/en-US/postgresql.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"server": "Server",
"query": "Query",
"split": "Split results in multiple messages",
"listen": "Keep listening for notifications",
"rowsPerMsg": "Number of rows per message"
},
"placeholder": {
Expand Down
18 changes: 14 additions & 4 deletions postgresql.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<span data-i18n="postgresql.label.port"></span>
</label>
<input type="text" id="node-config-input-port" data-i18n="[placeholder]postgresql.placeholder.port" style="width: 80%;" />
<input type="hidden" id="node-config-input-portFieldType" />
<input type="hidden" id="node-config-input-portFieldType" value="num" />
</div>
<div class="form-row">
<label for="node-config-input-database">
Expand Down Expand Up @@ -75,23 +75,23 @@
<span data-i18n="postgresql.label.max"></span>
</label>
<input type="text" id="node-config-input-max" data-i18n="[placeholder]postgresql.placeholder.max" style="width: 60%;" />
<input type="hidden" id="node-config-input-maxFieldType" />
<input type="hidden" id="node-config-input-maxFieldType" value="num" />
</div>
<div class="form-row">
<label for="node-config-input-idle" style="width: 150px;">
<i class="fa fa-hourglass-half"></i>
<span data-i18n="postgresql.label.idle"></span>
</label>
<input type="text" id="node-config-input-idle" data-i18n="[placeholder]postgresql.placeholder.idle" style="width: 60%;" />
<input type="hidden" id="node-config-input-idleFieldType" />
<input type="hidden" id="node-config-input-idleFieldType" value="num" />
</div>
<div class="form-row">
<label for="node-config-input-connectionTimeout" style="width: 150px;">
<i class="fa fa-hourglass-half"></i>
<span data-i18n="postgresql.label.connectionTimeout" ></span>
</label>
<input type="text" id="node-config-input-connectionTimeout" data-i18n="[placeholder]postgresql.placeholder.connectionTimeout" style="width: 60%;" />
<input type="hidden" id="node-config-input-connectionTimeoutFieldType" />
<input type="hidden" id="node-config-input-connectionTimeoutFieldType" value="num" />
</div>
</div>
</div>
Expand Down Expand Up @@ -279,6 +279,12 @@
</label>
<input type="number" id="node-input-rowsPerMsg" placeholder="1" value="1" min="1" />
</div>
<div class="form-row">
<input type="checkbox" id="node-input-listen" style="display: inline-block; width: auto; vertical-align: top;" />
<label for="node-input-listen" style="width: auto;">
<span data-i18n="postgresql.label.listen"></span>
</label>
</div>
<div class="form-row" style="position: relative; margin-bottom: 0px;">
<label for="node-input-query">
<i class="fa fa-file-code-o"></i>
Expand Down Expand Up @@ -313,6 +319,9 @@
rowsPerMsg: {
value: 1,
},
listen: {
value: false,
},
outputs: {
value: 1,
},
Expand All @@ -329,6 +338,7 @@
oneditprepare: function () {
$('#node-input-split').prop('checked', this.split);
$('#node-input-rowsPerMsg').value = this.split ? this.rowsPerMsg : 1;
$('#node-input-listen').prop('checked', this.listen);
this.editor = RED.editor.createEditor({
id: 'node-input-editor',
mode: 'ace/mode/sql',
Expand Down
47 changes: 37 additions & 10 deletions postgresql.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ module.exports = function (RED) {
host: getField(node, n.hostFieldType, n.host),
port: getField(node, n.portFieldType, n.port),
database: getField(node, n.databaseFieldType, n.database),
ssl: getField(node, n.sslFieldType, n.ssl),
ssl: getField(node, n.sslFieldType, n.ssl) != 'false',
application_name: getField(node, n.applicationNameType, n.applicationName),
max: getField(node, n.maxFieldType, n.max),
idleTimeoutMillis: getField(node, n.idleFieldType, n.idle),
connectionTimeoutMillis: getField(node, n.connectionTimeoutFieldType, n.connectionTimeout),
idleTimeoutMillis: +getField(node, n.idleFieldType, n.idle),
connectionTimeoutMillis: +getField(node, n.connectionTimeoutFieldType, n.connectionTimeout),
});
this.pgPool.on('error', (err, _) => {
node.error(err.message);
Expand All @@ -98,6 +98,7 @@ module.exports = function (RED) {
function PostgreSQLNode(config) {
const node = this;
RED.nodes.createNode(node, config);
node.listen = config.listen;
node.topic = config.topic;
node.query = config.query;
node.split = config.split;
Expand Down Expand Up @@ -154,6 +155,8 @@ module.exports = function (RED) {
};
updateStatus(0, false);

let client = null;

node.on('input', async (msg, send, done) => {
// 'send' and 'done' require Node-RED 1.0+
send = send || function () { node.send.apply(node, arguments); };
Expand All @@ -173,8 +176,6 @@ module.exports = function (RED) {
const partsId = Math.random();
let query = msg.query ? msg.query : Mustache.render(node.query, { msg });

let client = null;

const handleDone = async (isError = false) => {
if (cursor) {
cursor.close();
Expand Down Expand Up @@ -214,18 +215,42 @@ module.exports = function (RED) {
}
};

handleDone();
if (node.listen) {
// Avoid multiple listening instances
handleDone();
}
updateStatus(+1);
downstreamReady = true;

try {
if (msg.pgConfig) {
client = new Client(msg.pgConfig);
if (msg.pgConfig || node.listen) {
// Do not use pool for clients with custom config nor for listeners
const pgConfig = Object.assign({},
node.config.pgPool.options, {
password: node.config.password,
}, msg.pgConfig);
client = new Client(pgConfig);
await client.connect();
} else {
client = await node.config.pgPool.connect();
}

client.on('notice', (msg) => {
send({
notice: msg,
});
});

if (node.listen) {
client.on('notification', (msg) => {
send({
channel: msg.channel,
payload: msg.payload,
processId: msg.processId,
});
});
}

let params = [];
if (msg.params && msg.params.length > 0) {
params = msg.params;
Expand All @@ -244,7 +269,7 @@ module.exports = function (RED) {
handleError(err);
} else {
const complete = rows.length < node.rowsPerMsg;
if (complete) {
if (complete && !node.listen) {
handleDone(false);
}
const msg2 = Object.assign({}, msg, {
Expand Down Expand Up @@ -311,7 +336,9 @@ module.exports = function (RED) {
};
}

handleDone();
if (!node.listen) {
handleDone();
}
downstreamReady = false;
send(msg);
if (tickUpstreamNode) {
Expand Down