Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support runtime: 'bun_workers' #73

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,28 @@ jobs:
runs-on: ${{matrix.os}}
steps:
- uses: actions/checkout@v2

- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}

- uses: oven-sh/setup-bun@v1
if: ${{ matrix.os != 'windows-latest' }}
with:
bun-version: latest

- uses: pnpm/action-setup@v2

- name: Install Dependencies
run: pnpm install

- name: Build
run: pnpm build

- name: Test
run: pnpm test:ci

- name: Test Bun
if: ${{ matrix.os != 'windows-latest' }}
run: pnpm test:bun
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.nyc_output
.vscode
node_modules
bun.lockb
dist
coverage
5 changes: 5 additions & 0 deletions benchmark/fixtures/wrap-add-bun.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import add from './add.mjs'

self.onmessage = (event) => {
postMessage(add(event.data))
}
132 changes: 111 additions & 21 deletions benchmark/isolate-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -1,35 +1,66 @@
/*
* Benchmark for testing whether Tinypool's worker creation and teardown is expensive.
* Benchmark focusing on the performance `isolateWorkers` option
*
* Options:
* - `--rounds` (optional) - Specify how many iterations to run
* - `--threads` (optional) - Specify how many threads to use
*/
import { cpus } from 'node:os'
import { Worker } from 'node:worker_threads'

import * as os from 'node:os'
import * as WorkerThreads from 'node:worker_threads'

import Tinypool from '../dist/esm/index.js'

const THREADS = cpus().length - 1
const ROUNDS = 5_000
const IS_BUN = process.versions.bun !== undefined
const USE_ATOMICS = !IS_BUN
const THREADS = parseInt(getArgument('--threads') ?? getMaxThreads(), 10)
const ROUNDS = parseInt(getArgument('--rounds') ?? '2000', 10)

console.log('Options:', { THREADS, ROUNDS, IS_BUN }, '\n')

if (IS_BUN) {
await logTime(
"Tinypool { runtime: 'bun_workers' }",
runTinypool('bun_workers')
)

await logTime('Native Bun workers', runBunWorkers())
process.exit(0)
}

await logTime(
"Tinypool { runtime: 'worker_threads' }",
runTinypool('worker_threads')
)
await logTime(
"Tinypool { runtime: 'child_process' }",
runTinypool('child_process')
)

await logTime('Tinypool', runTinypool)
await logTime('Worker threads', runWorkerThreads)
await logTime('Native node:worker_threads', runNodeWorkerThreads())

async function runTinypool() {
function runTinypool(runtime) {
const pool = new Tinypool({
runtime,
filename: new URL('./fixtures/add.mjs', import.meta.url).href,
isolateWorkers: true,
minThreads: THREADS,
maxThreads: THREADS,
useAtomics: USE_ATOMICS,
})

await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
return async function run() {
await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
}
}

async function runWorkerThreads() {
function runNodeWorkerThreads() {
async function task() {
const worker = new Worker('./fixtures/wrap-add.mjs')
const worker = new WorkerThreads.Worker('./fixtures/wrap-add.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) =>
Expand All @@ -50,16 +81,75 @@ async function runWorkerThreads() {
}
}

await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function runBunWorkers() {
async function task() {
const worker = new Worker('./fixtures/wrap-add-bun.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) => {
worker.onmessage = (event) =>
event.data === 3 ? resolve() : reject('Not 3')
})

await worker.terminate()
}

const pool = Array(ROUNDS).fill(task)

async function execute() {
const task = pool.shift()

if (task) {
await task()
return execute()
}
}

return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function getArgument(flag) {
const index = process.argv.indexOf(flag)
if (index === -1) return

return process.argv[index + 1]
}

function getMaxThreads() {
return os.availableParallelism?.() || os.cpus().length - 1
}

async function logTime(label, method) {
console.log(`${label} | START`)

const start = process.hrtime.bigint()
await method()
const end = process.hrtime.bigint()
console.log(label, 'took', ((end - start) / 1_000_000n).toString(), 'ms')

console.log(`${label} | END ${((end - start) / 1_000_000n).toString()} ms`)

console.log('Cooling down for 2s')
const interval = setInterval(() => process.stdout.write('.'), 100)
await sleep(2_000)
clearInterval(interval)
console.log(' ✓\n')
}

async function sleep(ms) {
await new Promise((resolve) => setTimeout(resolve, ms))
}
4 changes: 4 additions & 0 deletions bun-test/fixtures/eval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// eslint-disable-next-line no-eval
export default function (code) {
return eval(code)
}
59 changes: 59 additions & 0 deletions bun-test/runtime.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import * as path from 'path'
import { fileURLToPath } from 'url'
import { Tinypool } from '../dist/esm'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

describe('Bun Workers', () => {
test('runs code in Bun Worker', async () => {
const pool = createPool({ runtime: 'bun_workers' })

const result = await pool.run(`
(async () => {
return {
sum: 11 + 12,
isMainThread: Bun.isMainThread,
pid: process.pid,
}
})()
`)
expect(result.sum).toBe(23)
expect(result.isMainThread).toBe(false)
expect(result.pid).toBe(process.pid)
})

test('sets tinypool state', async () => {
const pool = createPool({ runtime: 'bun_workers' })

const result = await pool.run('process.__tinypool_state__')
expect(result.isTinypoolWorker).toBe(true)
expect(result.isBunWorker).toBe(true)
expect(result.isWorkerThread).toBe(undefined)
expect(result.isChildProcess).toBe(undefined)
})

test('errors are serialized', async () => {
const pool = createPool({ runtime: 'bun_workers' })

const error = await pool
.run("throw new TypeError('Test message');")
.catch((e: Error) => e)

expect(error.name).toBe('TypeError')
expect(error.message).toBe('Test message')

// Nope Bun does not do this
// expect(error.stack).toMatch('fixtures/eval.js')
})
})

function createPool(options) {
const pool = new Tinypool({
filename: path.resolve(__dirname, './fixtures/eval.js'),
minThreads: 1,
maxThreads: 1,
...options,
})

return pool
}
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"scripts": {
"test:ci": "node --experimental-vm-modules node_modules/jest/bin/jest.js --no-coverage --runInBand",
"test:dev": "node --experimental-vm-modules --trace-warnings node_modules/jest/bin/jest.js --watch --no-coverage",
"test:bun": "bun --bun test bun-test/**",
"dev": "tsup --watch",
"build": "tsup",
"publish": "clean-publish",
Expand Down Expand Up @@ -72,7 +73,9 @@
"extensionsToTreatAsEsm": [
".ts"
],
"testRegex": "test.(js|ts|tsx)$",
"testMatch": [
"**/test/**/*.test.ts"
],
"verbose": true,
"coverageDirectory": "./coverage/",
"collectCoverage": true,
Expand Down
78 changes: 78 additions & 0 deletions src/entry/bun-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import {
StartupMessage,
ReadyMessage,
RequestMessage,
ResponseMessage,
} from '../common'
import { getHandler, throwInNextTick } from './utils'
import { stderr, stdout } from 'src/utils'

process.__tinypool_state__ = {
isTinypoolWorker: true,
isBunWorker: true,
workerData: null,
workerId: 1,
}

self.onmessage = onWorkerMessage

function onWorkerMessage(event: MessageEvent<StartupMessage>) {
const { filename, name } = event.data

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
}

const readyMessage: ReadyMessage = { ready: true }
self.postMessage(readyMessage, '')
})().catch(throwInNextTick)

if (event.ports?.[0]) {
AriPerkkio marked this conversation as resolved.
Show resolved Hide resolved
event.ports[0].start()
event.ports[0].onmessage = onPortMessage.bind(null, event.ports[0])
}
}

function onPortMessage(port: MessagePort, event: MessageEvent<RequestMessage>) {
const message = event.data
const { taskId, task, filename, name } = message

;(async function () {
let response: ResponseMessage

try {
const handler = await getHandler(filename, name)
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
let result = await handler(task)
response = {
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
// before potentially entering the `Atomics.wait()` loop, and before
// returning the result so that messages will always be printed even
// if the process would otherwise be ready to exit.
if (stdout()?.writableLength! > 0) {
await new Promise((resolve) => process.stdout.write('', resolve))
}
if (stderr()?.writableLength! > 0) {
await new Promise((resolve) => process.stderr.write('', resolve))
}
} catch (error) {
response = {
taskId,
result: null,
error,
usedMemory: process.memoryUsage().heapUsed,
}
}

port.postMessage(response)
})().catch(throwInNextTick)
}
Loading
Loading