From 660de9423920fd9300ed693cddad96c9f67b2be4 Mon Sep 17 00:00:00 2001 From: toyobayashi Date: Fri, 10 May 2024 18:16:36 +0800 Subject: [PATCH] update --- packages/core/src/index.ts | 3 +- packages/core/src/load.ts | 11 ++---- packages/wasi-threads/src/index.ts | 1 + packages/wasi-threads/src/thread-manager.ts | 24 ++++++++++--- packages/wasi-threads/src/wasi-threads.ts | 39 +++++++++++++++++++++ packages/wasi-threads/test/index.js | 8 +++-- 6 files changed, 69 insertions(+), 17 deletions(-) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ba99e457..0a62635d 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -24,8 +24,7 @@ export type { export type { LoadOptions, InstantiateOptions, - InstantiatedSource, - ReactorWASI + InstantiatedSource } from './load' export type { diff --git a/packages/core/src/load.ts b/packages/core/src/load.ts index a0e3f4ce..6c7947b9 100644 --- a/packages/core/src/load.ts +++ b/packages/core/src/load.ts @@ -1,4 +1,4 @@ -import { WASIThreads, createInstanceProxy } from '@emnapi/wasi-threads' +import { type WASIInstance, WASIThreads, createInstanceProxy } from '@emnapi/wasi-threads' import { type InputType, load, loadSync } from './util' import { createNapiModule } from './emnapi/index' import type { CreateOptions, NapiModule } from './emnapi/index' @@ -8,16 +8,9 @@ export interface InstantiatedSource extends WebAssembly.WebAssemblyInstantiatedS napiModule: NapiModule } -/** @public */ -export interface ReactorWASI { - readonly wasiImport?: Record - initialize (instance: object): void - getImportObject? (): any -} - /** @public */ export interface LoadOptions { - wasi?: ReactorWASI + wasi?: WASIInstance overwriteImports?: (importObject: WebAssembly.Imports) => WebAssembly.Imports beforeInit?: (source: WebAssembly.WebAssemblyInstantiatedSource) => void getMemory?: (exports: WebAssembly.Exports) => WebAssembly.Memory diff --git a/packages/wasi-threads/src/index.ts b/packages/wasi-threads/src/index.ts index dd2b3f01..99051e75 100644 --- a/packages/wasi-threads/src/index.ts +++ b/packages/wasi-threads/src/index.ts @@ -2,6 +2,7 @@ export type { ThreadManagerOptions, WorkerLike, WorkerMessageEvent, WorkerFactor export { ThreadManager } from './thread-manager' export type { + WASIInstance, WASIThreadsOptions, MainThreadOptions, ChildThreadOptions, diff --git a/packages/wasi-threads/src/thread-manager.ts b/packages/wasi-threads/src/thread-manager.ts index 4650d54e..7c45e581 100644 --- a/packages/wasi-threads/src/thread-manager.ts +++ b/packages/wasi-threads/src/thread-manager.ts @@ -35,12 +35,14 @@ export function checkSharedWasmMemory (wasmMemory?: WebAssembly.Memory | null): } } +let nextWorkerID = 0 + /** @public */ export class ThreadManager { public unusedWorkers: WorkerLike[] = [] public runningWorkers: WorkerLike[] = [] public pthreads: Record = Object.create(null) - public nextWorkerID = 0 + public get nextWorkerID (): number { return nextWorkerID } public wasmModule: WebAssembly.Module | null = null public wasmMemory: WebAssembly.Memory | null = null @@ -73,8 +75,8 @@ export class ThreadManager { public markId (worker: WorkerLike): number { if (worker.__emnapi_tid) return worker.__emnapi_tid - const tid = this.nextWorkerID + 43 - this.nextWorkerID = (this.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42) + const tid = nextWorkerID + 43 + nextWorkerID = (nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42) this.pthreads[tid] = worker worker.__emnapi_tid = tid return tid @@ -121,7 +123,9 @@ export class ThreadManager { // err('failed to load in child thread: ' + (payload.err.message || payload.err)) // } } else if (type === 'cleanup-thread') { - this.cleanThread(worker, payload.tid) + if (payload.tid in this.pthreads) { + this.cleanThread(worker, payload.tid) + } } } }; @@ -216,6 +220,18 @@ export class ThreadManager { } } + public terminateAllThreads (): void { + for (let i = 0; i < this.runningWorkers.length; ++i) { + this.terminateWorker(this.runningWorkers[i]) + } + for (let i = 0; i < this.unusedWorkers.length; ++i) { + this.terminateWorker(this.unusedWorkers[i]) + } + this.unusedWorkers = [] + this.runningWorkers = [] + this.pthreads = Object.create(null) + } + public addMessageEventListener (worker: WorkerLike, onMessage: (e: WorkerMessageEvent) => void): () => void { let listeners = this.messageEvents.get(worker) if (!listeners) { diff --git a/packages/wasi-threads/src/wasi-threads.ts b/packages/wasi-threads/src/wasi-threads.ts index dca597e1..485ad377 100644 --- a/packages/wasi-threads/src/wasi-threads.ts +++ b/packages/wasi-threads/src/wasi-threads.ts @@ -2,6 +2,14 @@ import { ENVIRONMENT_IS_NODE, deserizeErrorFromBuffer, getPostMessage } from './ import { checkSharedWasmMemory, ThreadManager } from './thread-manager' import type { WorkerMessageEvent, ThreadManagerOptions } from './thread-manager' +/** @public */ +export interface WASIInstance { + readonly wasiImport?: Record + initialize (instance: object): void + start (instance: object): void + getImportObject? (): any +} + /** @public */ export interface BaseOptions { version?: 'preview1' @@ -46,6 +54,7 @@ export class WASIThreads { private readonly threadSpawn: (startArg: number, errorOrTid?: number) => number private readonly childThread: boolean + private readonly postMessage: ((message: any) => void) | undefined public constructor (options: WASIThreadsOptions) { if (!options) { @@ -80,6 +89,7 @@ export class WASIThreads { if (this.childThread && typeof postMessage !== 'function') { throw new TypeError('options.postMessage is not a function') } + this.postMessage = postMessage const wasm64 = Boolean(options.wasm64) @@ -89,6 +99,8 @@ export class WASIThreads { const payload = e.data.__emnapi__.payload if (type === 'spawn-thread') { threadSpawn(payload.startArg, payload.errorOrTid) + } else if (type === 'terminate-all-threads') { + this.terminateAllThreads() } } } @@ -229,4 +241,31 @@ export class WASIThreads { this.PThread.setup(wasmModule, wasmMemory) } } + + public patchWasiInstance (wasi: T): T { + if (!wasi) return wasi + const wasiImport = wasi.wasiImport + if (wasiImport) { + const proc_exit = wasiImport.proc_exit + // eslint-disable-next-line @typescript-eslint/no-this-alias + const _this = this + wasiImport.proc_exit = function (code: number): number { + _this.terminateAllThreads() + return proc_exit.call(this, code) + } + } + return wasi + } + + public terminateAllThreads (): void { + if (!this.childThread) { + this.PThread?.terminateAllThreads() + } else { + this.postMessage!({ + __emnapi__: { + type: 'terminate-all-threads' + } + }) + } + } } diff --git a/packages/wasi-threads/test/index.js b/packages/wasi-threads/test/index.js index 01f65391..786e59c2 100644 --- a/packages/wasi-threads/test/index.js +++ b/packages/wasi-threads/test/index.js @@ -108,6 +108,7 @@ // optional waitThreadStart: 1000 }) + wasiThreads.patchWasiInstance(wasi) const memory = new WebAssembly.Memory({ initial: 16777216 / 65536, maximum: 2147483648 / 65536, @@ -116,7 +117,8 @@ let input try { input = require('node:fs').readFileSync(require('node:path').join(__dirname, file)) - } catch (_) { + } catch (err) { + console.warn(err) const response = await fetch(file) input = await response.arrayBuffer() } @@ -130,7 +132,9 @@ wasiThreads.setup(instance, module, memory) if (model === ExecutionModel.Command) { - return wasi.start(instance) + const code = wasi.start(instance) + // wasiThreads.terminateAllThreads() + return code } else { wasi.initialize(instance) return instance.exports.fn(1)