Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inserts job in existing transaction by passing pgClient instance #515

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions docs/api/jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,36 @@ Available in constructor as a default, or overridden in send.

```ts
interface Db {
executeSql(text: string, values: any[]): Promise<{ rows: any[] }>;
}
executeSql(text: string, values: any[], options?: ExecuteOptions): Promise<{ rows: any[] }>;
}
```

**Execute options**

* **executeOptions**, object

This object allows you to pass options to the `executeSql` function of the underlying database adapter.

* **pgClient**, `pg.Client` object

You can provide a custom `pgClient` instance, so PGBoss can insert jobs in an already started database transaction.
```ts
const pgClient = new pg.Client("postgres://...")
try {
await pgClient.query('BEGIN')
await pgClient.query(`INSERT INTO public.my_business_table (label) VALUES('My data')`)

boss.send('my-job', { someData: 'someValue' }, {
executeOptions: { pgClient }
})

await pgClient.query('COMMIT')
} catch (e) {
await pgClient.query('ROLLBACK')
}
```
PostgreSQL supports cross-schema transactions, so this is a great way to provide strong consistency guarantees between your business data and PGBoss jobs.

**Deferred jobs**

* **startAfter** int, string, or Date
Expand Down
5 changes: 4 additions & 1 deletion src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ class Db extends EventEmitter {
}
}

async executeSql (text, values) {
async executeSql (text, values, executeOptions) {
if (executeOptions?.pgClient) {
return await executeOptions.pgClient.query(text, values)
}
if (this.opened) {
// if (this.config.debug === true) {
// console.log(`${new Date().toISOString()}: DEBUG SQL`)
Expand Down
3 changes: 2 additions & 1 deletion src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ class Manager extends EventEmitter {
const {
id = null,
db: wrapper,
executeOptions,
priority,
startAfter,
singletonKey = null,
Expand Down Expand Up @@ -373,7 +374,7 @@ class Manager extends EventEmitter {
]

const db = wrapper || this.db
const { rows } = await db.executeSql(this.insertJobCommand, values)
const { rows } = await db.executeSql(this.insertJobCommand, values, executeOptions)

if (rows.length === 1) {
return rows[0].id
Expand Down
12 changes: 12 additions & 0 deletions test/databaseTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,16 @@ describe('database', function () {

assert(response.text === query)
})

it('uses custom pg Client when provided', async function () {
const query = 'SELECT something FROM somewhere'
const pgClientMock = {
query: async (text, values) => ({ rows: [], text })
}

const boss = new PgBoss('postgres://bobby:tables@wat:12345/northwind')
const response = await boss.getDb().executeSql(query, [], { pgClient: pgClientMock })

assert(response.text === query)
})
})
121 changes: 121 additions & 0 deletions test/sendTest.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const assert = require('node:assert')
const helper = require('./testHelper')
const pg = require('pg')

describe('send', function () {
it('should fail with no arguments', async function () {
Expand Down Expand Up @@ -136,4 +137,124 @@ describe('send', function () {

assert(!job)
})

/** @type {pg.Client | null} */
let otherPgClient = null

afterEach(async function () {
if (otherPgClient) {
await otherPgClient.end()
otherPgClient = null
}
})

it('should accept job object with name and custom pg Client', async function () {
const pgBossOptions = { ...this.test.bossConfig }
const boss = this.test.boss = await helper.start(pgBossOptions)
const queue = this.test.bossConfig.schema

otherPgClient = new pg.Client(helper.getConnectionString(pgBossOptions))
await otherPgClient.connect()

let called = false
const options = {
executeOptions: {
// This is a mock to ensure the custom pg Client is used
pgClient: {
query: async (text, values) => {
called = true
return otherPgClient.query(text, values)
}
}
},
someCrazyOption: 'whatever'
}

await boss.send({ name: queue, options })

const [job] = await boss.fetch(queue)

assert.notEqual(job, null)
assert.strictEqual(job.data, null)
assert.strictEqual(called, true)
})

it('should create job if transaction succeed with custom pg Client', async function () {
const pgBossOptions = { ...this.test.bossConfig }
const boss = this.test.boss = await helper.start(pgBossOptions)
const { schema } = this.test.bossConfig
const queue = schema

otherPgClient = new pg.Client(helper.getConnectionString(pgBossOptions))
await otherPgClient.connect()
const otherSchema = `${schema}_test_external`
await otherPgClient.query(`CREATE SCHEMA IF NOT EXISTS ${otherSchema}`)
await otherPgClient.query(`CREATE TABLE IF NOT EXISTS ${otherSchema}.test (label VARCHAR(50))`)
await otherPgClient.query(`TRUNCATE ${otherSchema}.test RESTART IDENTITY`)

try {
await otherPgClient.query('BEGIN')
const options = {
executeOptions: {
pgClient: otherPgClient
},
someCrazyOption: 'whatever'
}
const queryText = `INSERT INTO ${otherSchema}.test(label) VALUES('Test')`
await otherPgClient.query(queryText)

await boss.send({ name: queue, options })

await otherPgClient.query('COMMIT')
} catch (e) {
await otherPgClient.query('ROLLBACK')
}

const [job] = await boss.fetch(queue)
const externalSchemaResult = await otherPgClient.query(`SELECT * FROM ${otherSchema}.test`)

assert(!!job)
assert(externalSchemaResult.rowCount === 1)
})

it('should not create job if transaction fails with custom pg Client', async function () {
const pgBossOptions = { ...this.test.bossConfig }
const boss = this.test.boss = await helper.start(pgBossOptions)
const { schema } = this.test.bossConfig
const queue = schema

otherPgClient = new pg.Client(helper.getConnectionString(pgBossOptions))
await otherPgClient.connect()
const otherSchema = `${schema}_test_external`
await otherPgClient.query(`CREATE SCHEMA IF NOT EXISTS ${otherSchema}`)
await otherPgClient.query(`CREATE TABLE IF NOT EXISTS ${otherSchema}.test (label VARCHAR(50))`)
await otherPgClient.query(`TRUNCATE ${otherSchema}.test RESTART IDENTITY`)

const throwError = () => { throw new Error('Error!!') }

try {
await otherPgClient.query('BEGIN')
const options = {
executeOptions: {
pgClient: otherPgClient
},
someCrazyOption: 'whatever'
}
const queryText = `INSERT INTO ${otherSchema}.test(label) VALUES('Test')`
await otherPgClient.query(queryText)

await boss.send({ name: queue, options })

throwError()
await otherPgClient.query('COMMIT')
} catch (e) {
await otherPgClient.query('ROLLBACK')
}

const [job] = await boss.fetch(queue)
const externalSchemaResult = await otherPgClient.query(`SELECT * FROM ${otherSchema}.test`)

assert(!job)
assert(externalSchemaResult.rowCount === 0)
})
})
9 changes: 8 additions & 1 deletion types.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EventEmitter } from 'events'
import { ClientBase } from 'pg'

declare namespace PgBoss {

Expand All @@ -17,8 +18,13 @@ declare namespace PgBoss {
singleton: 'singleton',
stately: 'stately'
}

interface ExecuteOptions {
pgClient?: ClientBase;
}

interface Db {
executeSql(text: string, values: any[]): Promise<{ rows: any[] }>;
executeSql(text: string, values: any[], options?: ExecuteOptions): Promise<{ rows: any[] }>;
}

interface DatabaseOptions {
Expand Down Expand Up @@ -109,6 +115,7 @@ declare namespace PgBoss {

interface ConnectionOptions {
db?: Db;
executeOptions?: ExecuteOptions;
}

type InsertOptions = ConnectionOptions;
Expand Down