Skip to content

Commit

Permalink
Merge branch 'release/v0.22.9'
Browse files Browse the repository at this point in the history
  • Loading branch information
holtwick committed Jul 14, 2024
2 parents f077a1d + 5ca730e commit 09f023f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 32 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "zeed",
"type": "module",
"version": "0.22.8",
"version": "0.22.9",
"description": "🌱 Simple foundation library",
"author": {
"name": "Dirk Holtwick",
Expand Down Expand Up @@ -70,7 +70,7 @@
},
"devDependencies": {
"@antfu/eslint-config": "<2.22",
"@antfu/ni": "^0.21.12",
"@antfu/ni": "<0.22.0",
"@types/node": "^20.14.10",
"@vitejs/plugin-vue": "^5.0.5",
"@vitest/browser": "^2.0.2",
Expand Down
5 changes: 4 additions & 1 deletion src/common/exec/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ export async function tryTimeout<T>(
})
}

/** Wait for `event` on `obj` to emit. Resolve with result or reject on `timeout` */
/**
* @deprecated use emitter.waitOn
* Wait for `event` on `obj` to emit. Resolve with result or reject on `timeout`
*/
export function waitOn(
obj: any,
event: string,
Expand Down
44 changes: 30 additions & 14 deletions src/common/msg/emitter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import { vi } from 'vitest'
import { detect } from '../platform'
import { sleep, waitOn } from '../exec/promise'
import { getSecureRandomIfPossible } from '../data/math'
import { Emitter, getGlobalEmitter } from './emitter'

const platform = detect()

declare global {
interface ZeedGlobalEmitter {
a: (n: number) => void
Expand All @@ -20,7 +17,8 @@ interface LazyEvent {
obj: any
}

export function lazyListener(
/** @deprecated use waitOn */
function lazyListener(
emitter: any,
listenerKey?: string,
): (key?: string, skipUnmatched?: boolean) => Promise<any> {
Expand Down Expand Up @@ -182,16 +180,34 @@ describe('emitter', () => {
const v = await waitOn(e1, 'f')
expect(v).toBe(1)

if (platform.test) {
await expect(waitOn(e1, 'x', 10)).rejects.toThrow(
'Did not response in time',
)
// } else {
// // https://jasmine.github.io/api/3.5/global
// await expectAsync(on(e1, "x", 10)).toBeRejectedWithError(
// "Did not response in time"
// )
}
await expect(waitOn(e1, 'x', 10)).rejects.toThrow(
'Did not response in time',
)
// } else {
// // https://jasmine.github.io/api/3.5/global
// await expectAsync(on(e1, "x", 10)).toBeRejectedWithError(
// "Did not response in time"
// )
})

it('should wait on integrated', async () => {
expect.assertions(2)

const e1 = new Emitter<{
f: (v: number) => void
x: () => void
}>()

queueMicrotask(() => {
void e1.emit('f', 1)
})

const v = await e1.waitOn('f', 100)
expect(v).toBe(1)

await expect(e1.waitOn('x', 10)).rejects.toThrow(
'Did not response in time',
)
})

it('should work lazy', async () => {
Expand Down
22 changes: 22 additions & 0 deletions src/common/msg/emitter.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ArgumentsType } from 'vitest'
import type { DisposerFunction } from '../dispose-types'
import { getGlobalContext } from '../global'
import { DefaultLogger } from '../log'
Expand Down Expand Up @@ -146,6 +147,27 @@ export class Emitter<
this.subscribers = {}
return this
}

///

waitOn<U extends keyof LocalListener, R = Parameters<LocalListener[U]>[0]>(
event: U,
timeoutMS = 1000,
): Promise<R> {
return new Promise((resolve, reject) => {
let timer: any

const dispose = this.once(event, ((value): void => {
clearTimeout(timer)
resolve(value)
}) as LocalListener[U])

timer = setTimeout(() => {
dispose()
reject(new Error('Did not response in time'))
}, timeoutMS)
})
}
}

declare global {
Expand Down
30 changes: 15 additions & 15 deletions src/common/msg/rpc.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { MessageChannel } from 'node:worker_threads'
import { decodeJson, encodeJson } from '../bin'
import { cloneObject } from '../data'
import { useStringHashPool } from '../data/string-hash-pool'
Expand Down Expand Up @@ -36,7 +35,7 @@ describe('rpc async', () => {

it('basic', async () => {
const log: any[] = []
const channel = new MessageChannel()
const [c1, c2] = createLocalChannelPair()

const serialize = (data: any) => {
log.push(cloneObject(data))
Expand All @@ -45,8 +44,8 @@ describe('rpc async', () => {
const deserialize = (data: any) => decodeJson(data)

const bob = useRPC<BobFunctions, AliceFunctions>(Bob, {
post: data => channel.port1.postMessage(data),
on: data => channel.port1.on('message', data),
post: data => c1.postMessage(data),
on: data => c1.on('message', e => data(e.data)),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
Expand All @@ -55,9 +54,9 @@ describe('rpc async', () => {
const alice = useRPC<AliceFunctions, BobFunctions>(Alice, {
// mark bob's `bump` as an event without response
eventNames: ['bump'],
post: data => channel.port2.postMessage(data),
post: data => c2.postMessage(data),

on: data => channel.port2.on('message', data),
on: data => c2.on('message', e => data(e.data)),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
Expand All @@ -74,8 +73,8 @@ describe('rpc async', () => {
await new Promise(resolve => setTimeout(resolve, 100))
expect(Bob.getCount()).toBe(1)

channel.port1.close()
channel.port2.close()
c1.close()
c2.close()

expect(log).toMatchInlineSnapshot(`
Array [
Expand Down Expand Up @@ -110,13 +109,14 @@ describe('rpc async', () => {
})

it('hub', async () => {
const channel = new MessageChannel()
const [c1, c2] = createLocalChannelPair()

const serialize = (data: any) => encodeJson(data)
const deserialize = (data: any) => decodeJson(data)

const bobHub = useRPCHub({
post: data => channel.port1.postMessage(data),
on: data => channel.port1.on('message', data),
post: data => c1.postMessage(data),
on: data => c1.on('message', e => data(e.data)),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
Expand All @@ -127,9 +127,9 @@ describe('rpc async', () => {
const alice = useRPC<AliceFunctions, BobFunctions>(Alice, {
// mark bob's `bump` as an event without response
eventNames: ['bump'],
post: data => channel.port2.postMessage(data),
post: data => c2.postMessage(data),

on: data => channel.port2.on('message', data),
on: data => c2.on('message', e => data(e.data)),
serialize,
deserialize,
stringHashPool: useStringHashPool(),
Expand All @@ -146,8 +146,8 @@ describe('rpc async', () => {
await new Promise(resolve => setTimeout(resolve, 100))
expect(Bob.getCount()).toBe(1)

channel.port1.close()
channel.port2.close()
c1.close()
c2.close()
})

it('timeout async', async (done) => {
Expand Down

0 comments on commit 09f023f

Please sign in to comment.