diff --git a/packages/core/src/emnapi/index.d.ts b/packages/core/src/emnapi/index.d.ts index cb86cbdd..a626d404 100644 --- a/packages/core/src/emnapi/index.d.ts +++ b/packages/core/src/emnapi/index.d.ts @@ -1,5 +1,5 @@ import type { Context } from '@emnapi/runtime' -import type { ThreadManager } from '@emnapi/wasi-threads' +import type { ThreadManager, ThreadManagerOptionsMain, MainThreadBaseOptions } from '@emnapi/wasi-threads' /** @public */ export declare interface PointerInfo { @@ -65,15 +65,16 @@ export declare interface NodeBinding { /** @public */ export declare interface CreateWorkerInfo { type: 'thread' | 'async-work' + name: string } /** @public */ export declare type BaseCreateOptions = { filename?: string nodeBinding?: NodeBinding - reuseWorker?: boolean + reuseWorker?: ThreadManagerOptionsMain['reuseWorker'] asyncWorkPoolSize?: number - waitThreadStart?: boolean | number + waitThreadStart?: MainThreadBaseOptions['waitThreadStart'] onCreateWorker?: (info: CreateWorkerInfo) => any print?: (str: string) => void printErr?: (str: string) => void diff --git a/packages/core/src/load.ts b/packages/core/src/load.ts index 5b326127..bfc06e4b 100644 --- a/packages/core/src/load.ts +++ b/packages/core/src/load.ts @@ -142,29 +142,41 @@ function loadNapiModuleImpl (loadFn: Function, userNapiModule: NapiModule | unde napiModule.PThread.setup(module, memory) } - if (beforeInit) { - beforeInit({ - instance: originalInstance, - module + const emnapiInit = (): LoadedSource | InstantiatedSource => { + if (beforeInit) { + beforeInit({ + instance: originalInstance, + module + }) + } + napiModule.init({ + instance, + module, + memory, + table }) - } - napiModule.init({ - instance, - module, - memory, - table - }) - - const ret: any = { - instance: originalInstance, - module, - usedInstance: instance + const ret: LoadedSource | InstantiatedSource = { + instance: originalInstance, + module, + usedInstance: instance + } + if (!isLoad) { + (ret as InstantiatedSource).napiModule = napiModule + } + return ret } - if (!isLoad) { - ret.napiModule = napiModule + + if (napiModule.PThread.shouldPreloadWorkers()) { + const poolReady = napiModule.PThread.loadWasmModuleToAllWorkers() + if (loadFn === loadCallback) { + return poolReady.then(emnapiInit) + } else { + throw new Error('Synchronous loading is not supported with worker pool (reuseWorker.size > 0)') + } } - return ret + + return emnapiInit() }) } diff --git a/packages/emnapi/README.md b/packages/emnapi/README.md index 9aa00048..15299ed2 100644 --- a/packages/emnapi/README.md +++ b/packages/emnapi/README.md @@ -952,7 +952,17 @@ instantiateNapiModule(input, { * Reuse the thread worker after thread exit to avoid re-creatation * @defaultValue false */ - reuseWorker: true, + reuseWorker: { + /** + * @see {@link https://emscripten.org/docs/tools_reference/settings_reference.html#pthread-pool-size | PTHREAD_POOL_SIZE} + */ + size: 0, + + /** + * @see {@link https://emscripten.org/docs/tools_reference/settings_reference.html#pthread-pool-size-strict | PTHREAD_POOL_SIZE_STRICT} + */ + strict: false + }, onCreateWorker () { return new Worker('./worker.js') diff --git a/packages/emnapi/src/core/index.ts b/packages/emnapi/src/core/index.ts index 81c77f2a..32a999d8 100644 --- a/packages/emnapi/src/core/index.ts +++ b/packages/emnapi/src/core/index.ts @@ -1,4 +1,4 @@ -import { napiModule } from 'emnapi:shared' +import { napiModule, PThread } from 'emnapi:shared' import * as asyncMod from './async' import * as memoryMod from './memory' @@ -41,6 +41,7 @@ emnapiAWST.init() emnapiExternalMemory.init() emnapiString.init() emnapiTSFN.init() +PThread.init() napiModule.emnapi.syncMemory = emnapiMod.$emnapiSyncMemory napiModule.emnapi.getMemoryAddress = emnapiMod.$emnapiGetMemoryAddress diff --git a/packages/emnapi/src/core/init.ts b/packages/emnapi/src/core/init.ts index fc38a2f9..b9d534d6 100644 --- a/packages/emnapi/src/core/init.ts +++ b/packages/emnapi/src/core/init.ts @@ -36,7 +36,6 @@ declare const process: any export var ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && typeof process.versions === 'object' && process.versions !== null && typeof process.versions.node === 'string' export var ENVIRONMENT_IS_PTHREAD = Boolean(options.childThread) -export var reuseWorker = Boolean(options.reuseWorker) export var waitThreadStart = typeof options.waitThreadStart === 'number' ? options.waitThreadStart : Boolean(options.waitThreadStart) export var wasmInstance: WebAssembly.Instance @@ -69,7 +68,7 @@ export var napiModule: INapiModule = { emnapi: {}, loaded: false, filename: '', - childThread: Boolean(options.childThread), + childThread: ENVIRONMENT_IS_PTHREAD, initWorker: undefined!, executeAsyncWork: undefined!, @@ -244,15 +243,20 @@ function emnapiAddSendListener (worker: any): boolean { napiModule.emnapi.addSendListener = emnapiAddSendListener -export var PThread = new ThreadManager({ - printErr: err, - beforeLoad: (worker) => { - emnapiAddSendListener(worker) - }, - reuseWorker, - onCreateWorker: onCreateWorker as ThreadManagerOptions['onCreateWorker'] ?? (() => { - throw new Error('options.onCreateWorker` is not provided') - }) -}) +export var PThread = new ThreadManager( + ENVIRONMENT_IS_PTHREAD + ? { + printErr: err, + childThread: true + } + : { + printErr: err, + beforeLoad: (worker) => { + emnapiAddSendListener(worker) + }, + reuseWorker: options.reuseWorker, + onCreateWorker: onCreateWorker as ThreadManagerOptionsMain['onCreateWorker'] + } +) napiModule.PThread = PThread diff --git a/packages/emnapi/src/core/scope.d.ts b/packages/emnapi/src/core/scope.d.ts index 6e1f463e..5df9bcab 100644 --- a/packages/emnapi/src/core/scope.d.ts +++ b/packages/emnapi/src/core/scope.d.ts @@ -3,9 +3,9 @@ declare interface CreateOptions { filename?: string nodeBinding?: NodeBinding childThread?: boolean - reuseWorker?: boolean + reuseWorker?: ThreadManagerOptionsMain['reuseWorker'] asyncWorkPoolSize?: number - waitThreadStart?: boolean | number + waitThreadStart?: MainThreadBaseOptions['waitThreadStart'] onCreateWorker?: () => any print?: (str: string) => void printErr?: (str: string) => void @@ -15,7 +15,8 @@ declare interface CreateOptions { // factory parameter declare const options: CreateOptions -declare type ThreadManagerOptions = import('../../../wasi-threads/lib/typings/index').ThreadManagerOptions +declare type MainThreadBaseOptions = import('../../../wasi-threads/lib/typings/index').MainThreadBaseOptions +declare type ThreadManagerOptionsMain = import('../../../wasi-threads/lib/typings/index').ThreadManagerOptionsMain declare const ThreadManager: typeof import('../../../wasi-threads/lib/typings/index').ThreadManager // eslint-disable-next-line @typescript-eslint/no-redeclare declare type ThreadManager = import('../../../wasi-threads/lib/typings/index').ThreadManager diff --git a/packages/test/util.js b/packages/test/util.js index 7caf830b..b31a3fe5 100644 --- a/packages/test/util.js +++ b/packages/test/util.js @@ -45,18 +45,26 @@ function loadPath (request, options) { }) const napiModule = createNapiModule({ context, + filename: request, asyncWorkPoolSize: process.env.EMNAPI_TEST_WASI_THREADS ? RUNTIME_UV_THREADPOOL_SIZE : -RUNTIME_UV_THREADPOOL_SIZE, - filename: request, - reuseWorker: true, - waitThreadStart: 1000, - onCreateWorker () { - return new Worker(join(__dirname, './worker.js'), { - env: process.env, - execArgv: ['--experimental-wasi-unstable-preview1'] - }) - }, + ...(process.env.EMNAPI_TEST_WASI_THREADS + ? { + reuseWorker: { + size: RUNTIME_UV_THREADPOOL_SIZE * 4, + strict: true + }, + waitThreadStart: 1000, + onCreateWorker () { + return new Worker(join(__dirname, './worker.js'), { + env: process.env, + execArgv: ['--experimental-wasi-unstable-preview1'] + }) + } + } + : {} + ), ...(options || {}) }) diff --git a/packages/wasi-threads/README.md b/packages/wasi-threads/README.md index 81cbea1c..850cc403 100644 --- a/packages/wasi-threads/README.md +++ b/packages/wasi-threads/README.md @@ -40,7 +40,7 @@ This package makes [wasi-threads proposal](https://github.com/WebAssembly/wasi-t const { WASI } = require('wasi') const Worker = require('worker_threads') const { WASIThreads } = require('@emnapi/wasi-threads') - + const wasi = new WASI({ version: 'preview1' }) @@ -70,12 +70,14 @@ This package makes [wasi-threads proposal](https://github.com/WebAssembly/wasi-t wasi_snapshot_preview1: wasi.wasiImport, ...wasiThreads.getImportObject() }) - + + wasiThreads.setup(instance, module, memory) + await wasiThreads.preloadWorkers() + if (typeof instance.exports._start === 'function') { - const { exitCode } = wasiThreads.start(instance, module, memory) - return exitCode + return wasi.start(instance) } else { - instance = wasiThreads.initialize(instance, module, memory) + wasi.initialize(instance) // instance.exports.exported_wasm_function() } }) diff --git a/packages/wasi-threads/src/index.ts b/packages/wasi-threads/src/index.ts index 708d6b6f..81ffdb49 100644 --- a/packages/wasi-threads/src/index.ts +++ b/packages/wasi-threads/src/index.ts @@ -1,4 +1,13 @@ -export type { ThreadManagerOptions, WorkerLike, WorkerMessageEvent, WorkerFactory } from './thread-manager' +export type { + ReuseWorkerOptions, + ThreadManagerOptionsBase, + ThreadManagerOptionsMain, + ThreadManagerOptionsChild, + ThreadManagerOptions, + WorkerLike, + WorkerMessageEvent, + WorkerFactory +} from './thread-manager' export { ThreadManager } from './thread-manager' export type { @@ -20,6 +29,6 @@ export type { ThreadMessageHandlerOptions } from './worker' export { createInstanceProxy } from './proxy' -export { isTrapError } from './util' +export { isTrapError, isSharedArrayBuffer } from './util' export type { LoadPayload } from './command' diff --git a/packages/wasi-threads/src/thread-manager.ts b/packages/wasi-threads/src/thread-manager.ts index fba7196a..f720c9c2 100644 --- a/packages/wasi-threads/src/thread-manager.ts +++ b/packages/wasi-threads/src/thread-manager.ts @@ -1,5 +1,5 @@ import type { Worker as NodeWorker } from 'worker_threads' -import { ENVIRONMENT_IS_NODE } from './util' +import { ENVIRONMENT_IS_NODE, isSharedArrayBuffer } from './util' import { type MessageEventData, createMessage, type CommandPayloadMap, type CleanupThreadPayload } from './command' /** @public */ @@ -18,17 +18,43 @@ export interface WorkerMessageEvent { export type WorkerFactory = (ctx: { type: string; name: string }) => WorkerLike /** @public */ -export interface ThreadManagerOptions { +export interface ReuseWorkerOptions { + /** + * @see {@link https://emscripten.org/docs/tools_reference/settings_reference.html#pthread-pool-size | PTHREAD_POOL_SIZE} + */ + size: number + + /** + * @see {@link https://emscripten.org/docs/tools_reference/settings_reference.html#pthread-pool-size-strict | PTHREAD_POOL_SIZE_STRICT} + */ + strict?: boolean +} + +/** @public */ +export type ThreadManagerOptions = ThreadManagerOptionsMain | ThreadManagerOptionsChild + +/** @public */ +export interface ThreadManagerOptionsBase { printErr?: (message: string) => void +} + +/** @public */ +export interface ThreadManagerOptionsMain extends ThreadManagerOptionsBase { beforeLoad?: (worker: WorkerLike) => any - reuseWorker?: boolean + reuseWorker?: boolean | number | ReuseWorkerOptions onCreateWorker: WorkerFactory + childThread?: false +} + +/** @public */ +export interface ThreadManagerOptionsChild extends ThreadManagerOptionsBase { + childThread: true } const WASI_THREADS_MAX_TID = 0x1FFFFFFF export function checkSharedWasmMemory (wasmMemory?: WebAssembly.Memory | null): void { - if (typeof SharedArrayBuffer === 'undefined' || (wasmMemory && !(wasmMemory.buffer instanceof SharedArrayBuffer))) { + if (wasmMemory ? !isSharedArrayBuffer(wasmMemory.buffer) : (typeof SharedArrayBuffer === 'undefined')) { throw new Error( 'Multithread features require shared wasm memory. ' + 'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking' @@ -36,6 +62,27 @@ export function checkSharedWasmMemory (wasmMemory?: WebAssembly.Memory | null): } } +function getReuseWorker (value?: boolean | number | ReuseWorkerOptions): false | Required { + if (typeof value === 'boolean') { + return value ? { size: 0, strict: false } : false + } + if (typeof value === 'number') { + if (!(value >= 0)) { + throw new RangeError('reuseWorker: size must be a non-negative integer') + } + return { size: value, strict: false } + } + if (!value) { + return false + } + const size = Number(value.size) ?? 0 + const strict = Boolean(value.strict) + if (!(size > 0) && strict) { + throw new RangeError('reuseWorker: size must be set to positive integer if strict is set to true') + } + return { size, strict } +} + let nextWorkerID = 0 /** @public */ @@ -49,25 +96,96 @@ export class ThreadManager { public wasmMemory: WebAssembly.Memory | null = null private readonly messageEvents = new WeakMap void>>() + private readonly _childThread: boolean private readonly _onCreateWorker: WorkerFactory - private readonly _reuseWorker: boolean + private readonly _reuseWorker: false | Required private readonly _beforeLoad?: (worker: WorkerLike) => any /** @internal */ public readonly printErr: (message: string) => void public constructor (options: ThreadManagerOptions) { - const onCreateWorker = options.onCreateWorker - if (typeof onCreateWorker !== 'function') { - throw new TypeError('`options.onCreateWorker` is not provided') + if (!options) { + throw new TypeError('ThreadManager(): options is not provided') + } + + if ('childThread' in options) { + this._childThread = Boolean(options.childThread) + } else { + this._childThread = false } - this._onCreateWorker = onCreateWorker - this._reuseWorker = options.reuseWorker ?? false - this._beforeLoad = options.beforeLoad + + if (this._childThread) { + this._onCreateWorker = undefined! + this._reuseWorker = false + this._beforeLoad = undefined! + } else { + this._onCreateWorker = (options as ThreadManagerOptionsMain).onCreateWorker + this._reuseWorker = getReuseWorker((options as ThreadManagerOptionsMain).reuseWorker) + this._beforeLoad = (options as ThreadManagerOptionsMain).beforeLoad + } + this.printErr = options.printErr ?? console.error.bind(console) } - public init (): void {} + public init (): void { + if (!this._childThread) { + this.initMainThread() + } + } + + public initMainThread (): void { + this.preparePool() + } + + private preparePool (): void { + if (this._reuseWorker) { + if (this._reuseWorker.size) { + let pthreadPoolSize = this._reuseWorker.size + while (pthreadPoolSize--) { + const worker = this.allocateUnusedWorker() + if (ENVIRONMENT_IS_NODE) { + // https://github.com/nodejs/node/issues/53036 + (worker as NodeWorker).once('message', () => {}); + (worker as NodeWorker).unref() + } + } + } + } + } + + public shouldPreloadWorkers (): boolean { + return !this._childThread && this._reuseWorker && this._reuseWorker.size > 0 + } + + public loadWasmModuleToAllWorkers (): Promise { + const promises: Array> = Array(this.unusedWorkers.length) + for (let i = 0; i < this.unusedWorkers.length; ++i) { + const worker = this.unusedWorkers[i] + if (ENVIRONMENT_IS_NODE) (worker as NodeWorker).ref() + promises[i] = this.loadWasmModuleToWorker(worker).then( + (w) => { + if (ENVIRONMENT_IS_NODE) (worker as NodeWorker).unref() + return w + }, + (e) => { + if (ENVIRONMENT_IS_NODE) (worker as NodeWorker).unref() + throw e + } + ) + } + return Promise.all(promises).catch((err) => { + this.terminateAllThreads() + throw err + }) + } + + public preloadWorkers (): Promise { + if (this.shouldPreloadWorkers()) { + return this.loadWasmModuleToAllWorkers() + } + return Promise.resolve([]) + } public setup (wasmModule: WebAssembly.Module, wasmMemory: WebAssembly.Memory): void { this.wasmModule = wasmModule @@ -175,6 +293,9 @@ export class ThreadManager { public allocateUnusedWorker (): WorkerLike { const _onCreateWorker = this._onCreateWorker + if (typeof _onCreateWorker !== 'function') { + throw new TypeError('`options.onCreateWorker` is not provided') + } const worker = _onCreateWorker({ type: 'thread', name: 'emnapi-pthread' }) this.unusedWorkers.push(worker) return worker @@ -183,6 +304,14 @@ export class ThreadManager { public getNewWorker (sab?: Int32Array): WorkerLike | undefined { if (this._reuseWorker) { if (this.unusedWorkers.length === 0) { + if (this._reuseWorker.strict) { + if (!ENVIRONMENT_IS_NODE) { + const err = this.printErr + err('Tried to spawn a new thread, but the thread pool is exhausted.\n' + + 'This might result in a deadlock unless some threads eventually exit or the code explicitly breaks out to the event loop.') + return + } + } const worker = this.allocateUnusedWorker() // eslint-disable-next-line @typescript-eslint/no-floating-promises this.loadWasmModuleToWorker(worker, sab) @@ -224,21 +353,17 @@ export class ThreadManager { } public terminateAllThreads (): void { - if (this._reuseWorker) { - while (this.runningWorkers.length > 0) { - this.returnWorkerToPool(this.runningWorkers[0]) - } - } else { - for (let i = 0; i < this.runningWorkers.length; ++i) { - this.terminateWorker(this.runningWorkers[i]) - } - for (let i = 0; i < this.unusedWorkers.length; ++i) { - this.terminateWorker(this.unusedWorkers[i]) - } - this.unusedWorkers = [] - this.runningWorkers = [] - this.pthreads = Object.create(null) + for (let i = 0; i < this.runningWorkers.length; ++i) { + this.terminateWorker(this.runningWorkers[i]) + } + for (let i = 0; i < this.unusedWorkers.length; ++i) { + this.terminateWorker(this.unusedWorkers[i]) } + this.unusedWorkers = [] + this.runningWorkers = [] + this.pthreads = Object.create(null) + + this.preparePool() } public addMessageEventListener (worker: WorkerLike, onMessage: (e: WorkerMessageEvent) => void): () => void { diff --git a/packages/wasi-threads/src/util.ts b/packages/wasi-threads/src/util.ts index a69852a7..67091986 100644 --- a/packages/wasi-threads/src/util.ts +++ b/packages/wasi-threads/src/util.ts @@ -65,6 +65,14 @@ export function deserizeErrorFromBuffer (sab: SharedArrayBuffer): Error | null { return error } +/** @public */ +export function isSharedArrayBuffer (value: any): value is SharedArrayBuffer { + return ( + (typeof SharedArrayBuffer === 'function' && value instanceof SharedArrayBuffer) || + (Object.prototype.toString.call(value.constructor) === '[object SharedArrayBuffer]') + ) +} + /** @public */ export function isTrapError (e: Error): e is WebAssembly.RuntimeError { try { diff --git a/packages/wasi-threads/src/wasi-threads.ts b/packages/wasi-threads/src/wasi-threads.ts index 4baeedb4..b71243fb 100644 --- a/packages/wasi-threads/src/wasi-threads.ts +++ b/packages/wasi-threads/src/wasi-threads.ts @@ -1,6 +1,6 @@ import { ENVIRONMENT_IS_NODE, deserizeErrorFromBuffer, getPostMessage, isTrapError } from './util' import { checkSharedWasmMemory, ThreadManager } from './thread-manager' -import type { WorkerMessageEvent, ThreadManagerOptions } from './thread-manager' +import type { WorkerMessageEvent, ThreadManagerOptions, ThreadManagerOptionsMain, WorkerLike } from './thread-manager' import { type CommandPayloadMap, type MessageEventData, createMessage, type SpawnThreadPayload } from './command' import { createInstanceProxy } from './proxy' @@ -30,7 +30,7 @@ export interface MainThreadOptionsWithThreadManager extends MainThreadBaseOption } /** @public */ -export interface MainThreadOptionsCreateThreadManager extends MainThreadBaseOptions, ThreadManagerOptions {} +export interface MainThreadOptionsCreateThreadManager extends MainThreadBaseOptions, ThreadManagerOptionsMain {} /** @public */ export type MainThreadOptions = MainThreadOptionsWithThreadManager | MainThreadOptionsCreateThreadManager @@ -99,6 +99,7 @@ export class WASIThreads { } else { if (!this.childThread) { this.PThread = new ThreadManager(options as ThreadManagerOptions) + this.PThread.init() } } @@ -264,6 +265,13 @@ export class WASIThreads { } } + public preloadWorkers (): Promise { + if (this.PThread) { + return this.PThread.preloadWorkers() + } + return Promise.resolve([]) + } + /** * It's ok to call this method to a WASI command module. *