Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched job processing (opt-in) #474

Open
wants to merge 108 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
3b4092b
Start work on batch fetching jobs
benjie Jun 6, 2024
ecaa062
Batched job fetching with watermark
benjie Jun 6, 2024
9b2ba39
Fix getJob call to reflect changes in #469
benjie Jun 6, 2024
484e02c
Hoist completeJob and failJob
benjie Jun 6, 2024
862251d
Refactor failJob/completeJob in preparation for batching
benjie Jun 6, 2024
2382ed1
Stub batch function
benjie Jun 6, 2024
9538247
Implement batching function
benjie Jun 6, 2024
3752cdc
Lint fix
benjie Jun 6, 2024
113ef71
Tweak graceful/forceful shutdown handover
benjie Jun 6, 2024
1821691
Evaluate envvar just once up top
benjie Jun 6, 2024
c542aa3
Release batches before releasing worker pool
benjie Jun 6, 2024
93fb462
Refactor
benjie Jun 6, 2024
1cab674
Warn about bad batch settings
benjie Jun 10, 2024
fde4341
Refactor in preparation for new queue
benjie Jun 10, 2024
64f6aab
Refactor again and explain the purpose of LocalQueue and how it works
benjie Jun 10, 2024
044aa5a
Implement localQueue
benjie Jun 10, 2024
33faf35
Return to POLLING from WAITING; RELEASE mode.
benjie Jun 11, 2024
8ac92e6
Fix releasing from POLLING mode
benjie Jun 11, 2024
d7e381e
Concurrency 24 works better on my new machine
benjie Jun 11, 2024
1f6a4b0
Wait for background tasks to complete before releasing
benjie Jun 11, 2024
6b51f26
Default settings
benjie Jun 11, 2024
016ac39
Remove double map
benjie Jun 11, 2024
59f4d99
More efficient implementation
benjie Jun 11, 2024
52dd450
Neater API
benjie Jun 11, 2024
af7a26b
Marginally more efficient
benjie Jun 11, 2024
d05c966
Need to test more jobs because we're too fast now
benjie Jun 11, 2024
9da3ed8
Boolean comparison first
benjie Jun 11, 2024
99e9231
Single job different statement, plus disabled alternative approach
benjie Jun 11, 2024
f757f0d
Listen with new LocalQueue too
benjie Jun 11, 2024
37b5055
Lint fixes
benjie Jun 11, 2024
222c979
Oops
benjie Jun 11, 2024
96b2126
0.17.0-canary.6c2c85c
benjie Jul 12, 2024
333c9c2
0.17.0-canary.67dbcb6
benjie Oct 16, 2024
56ca277
[ci] bump
benjie Oct 18, 2024
afd7059
[ci] bump
benjie Oct 18, 2024
25f1fe8
[ci] bump
benjie Oct 18, 2024
307683e
[ci] bump
benjie Oct 18, 2024
28a173f
[ci] bump
benjie Oct 18, 2024
32f8299
[ci] bump
benjie Oct 18, 2024
a119a8b
[ci] bump
benjie Oct 18, 2024
2b7bd90
[ci] bump
benjie Oct 18, 2024
c43bdf9
[ci] bump
benjie Oct 18, 2024
6d1cfa4
Update src/localQueue.ts
benjie Oct 30, 2024
8868d1f
Move localQueue options into their own object
benjie Oct 25, 2024
9767d48
Introduce STARTING mode
benjie Oct 25, 2024
6d19367
Add localQueue refetchDelay feature: be kind on DB when queue is near…
benjie Oct 25, 2024
a087440
Randomize the abort threshold
benjie Oct 25, 2024
e3ec005
Cleaner branching without setting fetchAgain by accident
benjie Oct 25, 2024
d98598d
Lint
benjie Nov 1, 2024
8f72ece
Don't trigger refetch once setting mode to released
benjie Nov 11, 2024
098d203
Fix bug and clarify variable name
benjie Nov 11, 2024
b45b59d
Comments and variable renames for clarity
benjie Nov 11, 2024
835502f
Clarify and fix behavior of refetch delay
benjie Nov 13, 2024
abc16f8
Reduce diff
benjie Nov 13, 2024
187c4e0
0.17.0-canary.379fb2e
benjie Nov 13, 2024
f044bfe
Update performance results
benjie Nov 14, 2024
da79374
Beginnings of tower defence
benjie Nov 14, 2024
4b8462c
More waves and don't wait between waves
benjie Nov 14, 2024
6c7d92d
Add some events so we can collect stats from tower defence
benjie Nov 14, 2024
24d51c1
Easier to read results
benjie Nov 14, 2024
e295c72
Extreme values demonstrate problem
benjie Nov 14, 2024
5749bed
Produce the problematic behavior I was worried about
benjie Nov 14, 2024
993cf44
Clarify doc
benjie Nov 14, 2024
c0d7e9b
Rename abortThreshold to maxAbortThreshold and update docs and implem…
benjie Nov 15, 2024
6b9179c
More waves
benjie Nov 15, 2024
5126d39
Shorter waves
benjie Nov 15, 2024
81bc1bb
Ive we've already been running jobs slowly, don't sleep so long
benjie Nov 15, 2024
1308d84
Track empty fetches
benjie Nov 15, 2024
57b411e
Use addJobs API and track latency
benjie Nov 15, 2024
d2e4f12
Can't delete DB if workerUtils is connected
benjie Nov 15, 2024
daa2a62
Stupid mistake
benjie Nov 15, 2024
ee97cde
Fix division by zero error
benjie Nov 15, 2024
f000ad0
Add another wave, and increase local queue size by 1
benjie Nov 15, 2024
b445bf6
Use JSON rather than constructing tuples; batch at 1M
benjie Nov 15, 2024
1afae2c
0.17.0-canary.6aeb577
benjie Nov 15, 2024
51c5648
0.17.0-canary.9817f67
benjie Nov 15, 2024
6a7dd03
"Breaking" change
benjie Nov 18, 2024
c141856
Fix migration tests
benjie Nov 18, 2024
e7bd577
Fix all the issues from TypeScript strict mode
benjie Nov 18, 2024
91f1a5e
Move release of batching of fail/complete to terminate
benjie Nov 18, 2024
f4b7c4a
Test that graceful shutdown works in runOnce
benjie Nov 18, 2024
aa43296
Implement middleware system
benjie Nov 19, 2024
b746b4a
gracefulShutdown middleware
benjie Nov 19, 2024
b57766a
Shutting down non-continuous worker should go via gracefulShutdown path
benjie Nov 19, 2024
4e72225
f
benjie Nov 19, 2024
17ccefd
Add middleware for forcefulShutdown, guarantee event emitter, rework …
benjie Nov 19, 2024
49bcbee
Clarify relationship between CompiledSharedOptions and WorkerPluginCo…
benjie Nov 19, 2024
8035788
Fix lint issues
benjie Nov 19, 2024
ddce61f
Only shutdown if we're not already doing so
benjie Nov 19, 2024
18835b1
Fix onTerminate handling
benjie Nov 20, 2024
c83069a
Forceful shutdown should error if something went wrong
benjie Nov 20, 2024
40afb61
Forceful shutdown should result in promise rejection
benjie Nov 20, 2024
cf50392
Refactoring to ensure all cases are handled
benjie Nov 20, 2024
6afe773
We don't release job releasers here any more.
benjie Nov 20, 2024
4bb7227
Ensure that LocalQueue exits with the correct status (e.g. rejects if…
benjie Nov 20, 2024
4b6b5dc
Fix types in a test
benjie Nov 20, 2024
9848f8c
I threw it myself, I know what it is
benjie Nov 20, 2024
b62b8e3
Ensure deactivate happens at most once, and yields same errors if cal…
benjie Nov 20, 2024
0bdcd9f
Ensure forcefulShutdown and gracefulShutdown yield the same promises …
benjie Nov 20, 2024
6bc93fe
Graceful shutdown should not complete if forceful shutdown has begun
benjie Nov 20, 2024
b3150ac
Cleanup gracefulShutdown handover to forcefulShutdown
benjie Nov 20, 2024
a561feb
More consistently handle errors in forcefulShutdown
benjie Nov 20, 2024
3c3a237
Move promise handling outside of the gracefulShutdown/forcefulShutdow…
benjie Nov 20, 2024
28f3b0e
Only finish once
benjie Nov 20, 2024
7aa5955
The _moment_ that deactivate is called, all future getJob calls shoul…
benjie Nov 22, 2024
174c76c
Apply timeouts to forceful shutdown
benjie Nov 22, 2024
afcaaed
forcefulShutdown can be successful; channel errors separately
benjie Nov 22, 2024
fdd041c
On unexpected worker exit, shut down entire pool (gracefully)
benjie Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {

const options: WorkerSharedOptions = {};

const MAX_MIGRATION_NUMBER = 18;
const MAX_MIGRATION_NUMBER = 19;

test("migration installs schema; second migration does no harm", async () => {
await withPgClient(async (pgClient) => {
Expand Down Expand Up @@ -238,6 +238,7 @@ test("throws helpful error message in migration 11", async () => {

// Manually run the first 10 migrations
const event = {
ctx: compiledSharedOptions,
client: pgClient,
postgresVersion: 120000, // TODO: use the actual postgres version
scratchpad: Object.create(null),
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "graphile-worker",
"version": "0.16.6",
"version": "0.17.0-canary.9817f67",
"type": "commonjs",
"description": "Job queue for PostgreSQL",
"main": "dist/index.js",
Expand All @@ -18,6 +18,7 @@
"depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'",
"db:dump": "./scripts/dump_db",
"perfTest": "cd perfTest && node ./run.js",
"towerDefence": "cd towerDefence && node ./run.mjs",
"preversion": "grep '^### Pending' RELEASE_NOTES.md && echo \"⚠️ Cannot publish with 'Pending' in RELEASE_NOTES ⚠️\" && exit 1 || true",
"version": "node scripts/postversion.mjs && git add src/version.ts",
"website": "cd website && yarn run"
Expand Down
15 changes: 14 additions & 1 deletion perfTest/graphile.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,29 @@

// import { WorkerProPreset } from "../graphile-pro-worker/dist/index.js";

const CONCURRENT_JOBS = 24;

/** @type {GraphileConfig.Preset} */
const preset = {
// extends: [WorkerProPreset],
worker: {
connectionString:
process.env.PERF_DATABASE_URL || "postgres:///graphile_worker_perftest",
concurrentJobs: 3,
fileExtensions: [".js", ".cjs", ".mjs"],
// fileExtensions: [".js", ".cjs", ".mjs", ".ts", ".cts", ".mts"],
gracefulShutdownAbortTimeout: 2500,

concurrentJobs: CONCURRENT_JOBS,
maxPoolSize: CONCURRENT_JOBS + 1,

//localQueue: { size: -1 },
//completeJobBatchDelay: -1,
//failJobBatchDelay: -1,

localQueue: { size: 500, refetchDelay: { durationMs: 10 } },
completeJobBatchDelay: 0,
failJobBatchDelay: 0,
},
};

module.exports = preset;
24 changes: 11 additions & 13 deletions perfTest/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,19 @@ $$ language plpgsql;`,
} else {
const jobs = [];
for (let i = 0; i < jobCount; i++) {
jobs.push(
`("${taskIdentifier.replace(
/["\\]/g,
"\\$&",
)}","{\\"id\\":${i}}",,,,,,)`,
jobs.push({ identifier: taskIdentifier, payload: { id: i } });
}
console.time(`Adding jobs`);
while (jobs.length > 0) {
const jobsSlice = jobs.splice(0, 1000000);
const jobsString = JSON.stringify(jobsSlice);
console.log(`Adding ${jobsSlice.length} jobs`);
await pgPool.query(
`select 1 from graphile_worker.add_jobs(array(select json_populate_recordset(null::graphile_worker.job_spec, $1::json)));`,
[jobsString],
);
console.log(`...added`);
}
const jobsString = `{"${jobs
.map((j) => j.replace(/["\\]/g, "\\$&"))
.join('","')}"}`;
console.time("Adding jobs");
await pgPool.query(
`select graphile_worker.add_jobs($1::graphile_worker.job_spec[]);`,
[jobsString],
);
console.timeEnd("Adding jobs");
}

Expand Down
5 changes: 1 addition & 4 deletions perfTest/latencyTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ const preset = require("./graphile.config.js");
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

/** @type {import('../dist/index.js').WorkerPoolOptions} */
const options = {
concurrency: 1,
preset,
};
const options = { preset };

async function main() {
const pgPool = new Pool({ connectionString: process.env.PERF_DATABASE_URL });
Expand Down
15 changes: 3 additions & 12 deletions perfTest/run.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ const { execSync, exec: rawExec } = require("child_process");
const { promisify } = require("util");
const exec = promisify(rawExec);

const JOB_COUNT = 20000;
const JOB_COUNT = 200000;
const STUCK_JOB_COUNT = 0;
const PARALLELISM = 4;
const CONCURRENCY = 10;

const time = async (cb) => {
const start = process.hrtime();
Expand Down Expand Up @@ -52,10 +51,7 @@ async function main() {
console.log("Timing startup/shutdown time...");
let result;
const startupTime = await time(async () => {
result = await exec(
`node ../dist/cli.js --once -j ${CONCURRENCY} -m ${CONCURRENCY + 1}`,
execOptions,
);
result = await exec(`node ../dist/cli.js --once`, execOptions);
});
logResult(result);
console.log();
Expand All @@ -81,12 +77,7 @@ async function main() {
const dur = await time(async () => {
const promises = [];
for (let i = 0; i < PARALLELISM; i++) {
promises.push(
exec(
`node ../dist/cli.js --once -j ${CONCURRENCY} -m ${CONCURRENCY + 1}`,
execOptions,
),
);
promises.push(exec(`node ../dist/cli.js --once`, execOptions));
}
(await Promise.all(promises)).map(logResult);
});
Expand Down
4 changes: 4 additions & 0 deletions sql/000019.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--! breaking-change
-- This is just a breaking change marker for the v0.17 worker-centric to
-- pool-centric jump. The migration itself is not breaking.
select 1;
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { cosmiconfigSync } from "cosmiconfig";
import EventEmitter from "events";

import { MINUTE, SECOND } from "./cronConstants";
import type { WorkerEvents } from "./interfaces";
import { defaultLogger } from "./logger";

const cosmiconfigResult = cosmiconfigSync("graphile-worker").search();
Expand Down Expand Up @@ -34,6 +36,7 @@ export const makeWorkerPresetWorkerOptions = () =>
maxResetLockedInterval: 10 * MINUTE,
gracefulShutdownAbortTimeout: 5 * SECOND,
useNodeTime: false,
events: new EventEmitter() as WorkerEvents,
} satisfies GraphileConfig.WorkerOptions);

function enforceStringOrUndefined(
Expand Down
5 changes: 5 additions & 0 deletions src/generated/sql.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading