diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index 329feedffc94f..1decf46819438 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -16,6 +16,7 @@ "@affine/templates": "workspace:*", "@blocksuite/affine": "0.17.26", "@datastructures-js/binary-search-tree": "^5.3.2", + "eventemitter2": "^6.4.9", "foxact": "^0.2.33", "fractional-indexing": "^3.2.0", "fuse.js": "^7.0.0", diff --git a/packages/common/infra/src/op/README.md b/packages/common/infra/src/op/README.md new file mode 100644 index 0000000000000..b3d55da548332 --- /dev/null +++ b/packages/common/infra/src/op/README.md @@ -0,0 +1,174 @@ +# usage + +## Register Op Handlers + +### Function call handler + +```ts +class AddOp extends Op<{ a: number; b: number }, number> {} + +// register +const consumer: OpConsumer; +consumer.register(AddOp, ({ a, b }) => a + b); + +// call +const producer: OpProducer; +const ret = producer.send(new AddOp({ a: 1, b: 2 })); // Promise<3> +``` + +### Stream call handler + +```ts +class SubscribeStatusOp extends Op {} + +// register +const consumer: OpConsumer; +consumer.registerSubscribable(SubscribeStatusOp, (name: string) => { + return interval(3000).pipe(map(() => 'connected')); +}); + +// subscribe +const producer: OpProducer; +producer.subscribe(new SubscribeStatusOp('server'), { + next: status => { + ui.setServerStatus(status); + }, + error: error => { + ui.setServerError(error); + }, + complete: () => { + // + }, +}); +``` + +### Transfer variables + +> [Transferable Objects](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects) + +#### Producer transferables + +```ts +class JobOp extends Op<{ name: string; data: Uint8Array; data2: Uint8Array }, void> {} + +const producer: OpProducer; +const data = new Uint8Array([1, 2, 3]); +const nonTransferredData = new Uint8Array([1, 2, 3]); +producer.send(new JobOp({ name: 'compress', data, data2: nonTransferredData }).transfer([data.buffer])); + +// after transferring, you can not use the transferred variables anymore!!! +// moved +assertEq(data.byteLength, 0); +// copied +assertEq(nonTransferredData.byteLength, 3); +``` + +#### Consumer transferables + +```ts +class JobOp extends Op<{ id: string }, Uint8Array> {} + +const consumer: OpConsumer; +consumer.register(JobOp, ({ id }) => { + const data = new Uint8Array([1, 2, 3]); + return transfer(data, [data.buffer]); +}); +consumer.registerSubscribable(JobOp, ({ id }) => { + return interval(3000).pipe( + map(() => { + const data = new Uint8Array([1, 2, 3]); + transfer(data, [data.buffer]); + }) + ); +}); +``` + +## Communication + +### BroadcastChannel + +:::CAUTION + +BroadcastChannel doesn't support transfer transferable objects. All data passed through it's `postMessage` api would be structured cloned + +see [Structured_clone_algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) + +::: + +```ts +const channel = new BroadcastChannel('domain'); +const consumer = new OpConsumer(channel); +consumer.listen(); + +const producer = new OpProducer(channel); +producer.listen(); +``` + +### MessagePort + +```ts +const { port1, port2 } = new MessagePort(); + +const producer = new OpProducer(port1); +const consumer = new OpConsumer(port2); +``` + +### Worker + +```ts +const worker = new Worker('./xxx-worker'); +const producer = new OpProducer(worker); + +// in worker +const consumer = new OpConsumer(globalThis); +consumer.listen(); +``` + +### SharedWorker + +```ts +const worker = new SharedWorker('./xxx-worker'); +const producer = new OpProducer(worker.port); + +// in worker +globalThis.addEventListener('connect', event => { + const port = event.ports[0]; + const consumer = new OpConsumer(port); + consumer.listen(); +}); +``` + +## Why Pass Operations by Classes instead of Runtimeless Interfaces + +### clean code in caller side + +```ts +// class +class XXXOp extends Op {} + +send(new XXXOp()); + +// interface +send>({}); +``` + +### avoid magic strings & straightforward type checking + +```ts +// class +class AddOp extends Op<{ a: number; b: number }, number> {} +send(new AddOp({ a: 1, b: 2 })); + +register(AddOp, ({ a, b }) => a + b); + +// interface +interface MyOps { + add: [{ a: number; b: number }, number]; + // ^^^^^^^^^^^^^^^^ input ^ output +} + +type OpFromMyOps = Op + +send>('add', { a: 1, b: 2 }); +register>('add', ({ a, b }) => a + b); +``` diff --git a/packages/common/infra/src/op/__tests__/consumer.spec.ts b/packages/common/infra/src/op/__tests__/consumer.spec.ts new file mode 100644 index 0000000000000..0a317e836c5f1 --- /dev/null +++ b/packages/common/infra/src/op/__tests__/consumer.spec.ts @@ -0,0 +1,185 @@ +import { afterEach } from 'node:test'; + +import { Observable } from 'rxjs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { OpConsumer, transfer } from '../consumer'; +import type { MessageHandlers } from '../message'; +import { Op } from '../types'; + +declare module 'vitest' { + interface TestContext { + consumer: OpConsumer; + handlers: MessageHandlers; + postMessage: ReturnType; + } +} + +class AddOp extends Op<{ a: number; b: number }, number> { + protected override getId(): string { + return 'add'; + } +} +class AnyOp extends Op { + protected override getId(): string { + return 'any'; + } +} + +describe('op producer', () => { + beforeEach(ctx => { + const { port2 } = new MessageChannel(); + // @ts-expect-error patch postMessage + port2.postMessage = vi.fn(port2.postMessage); + // @ts-expect-error patch postMessage + ctx.postMessage = port2.postMessage; + ctx.consumer = new OpConsumer(port2); + // @ts-expect-error internal api + ctx.handlers = ctx.consumer.handlers; + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should throw if no handler registered', async ctx => { + ctx.handlers.call(new AddOp({ a: 1, b: 2 }).toCallMessage()[0]); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "error": [Error: Handler for operation [AddOp] is not registered.], + "id": "add", + "type": "return", + }, + ] + `); + }); + + it('should handle call message', async ctx => { + ctx.consumer.register(AddOp, ({ a, b }) => a + b); + + const op = new AddOp({ a: 1, b: 2 }); + ctx.handlers.call(op.toCallMessage()[0]); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "data": 3, + "id": "add", + "type": "return", + } + `); + }); + + it('should handle cancel message', async ctx => { + ctx.consumer.register(AddOp, ({ a, b }, { signal }) => { + const { reject, resolve, promise } = Promise.withResolvers(); + + signal?.addEventListener('abort', () => { + reject(new Error('canceled')); + }); + + setTimeout(() => { + resolve(a + b); + }, Number.MAX_SAFE_INTEGER); + + return promise; + }); + + const op = new AddOp({ a: 1, b: 2 }).toCallMessage()[0]; + ctx.handlers.call(op); + ctx.handlers.cancel({ type: 'cancel', id: op.id }); + + await vi.advanceTimersByTimeAsync(1); + + expect(ctx.postMessage.mock.calls).toMatchInlineSnapshot(` + [ + [ + { + "error": [Error: canceled], + "id": "add", + "type": "return", + }, + ], + ] + `); + }); + + it('should transfer transferables in return', async ctx => { + const data = new Uint8Array([1, 2, 3]); + const nonTransferred = new Uint8Array([4, 5, 6]); + + ctx.consumer.register(AnyOp, () => { + return transfer({ data: { data, nonTransferred } }, [data.buffer]); + }); + + const op = new AnyOp({}).toCallMessage()[0]; + ctx.handlers.call(op); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage).toHaveBeenCalledOnce(); + + expect(data.byteLength).toBe(0); + expect(nonTransferred.byteLength).toBe(3); + }); + + it('should handle subscribe message', async ctx => { + ctx.consumer.registerSubscribable(AnyOp, data => { + return new Observable(observer => { + data.forEach((v: number) => observer.next(v)); + observer.complete(); + }); + }); + + const op = new AnyOp(new Uint8Array([1, 2, 3])).toSubscribeMessage()[0]; + ctx.handlers.subscribe(op); + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.calls.map(call => call[0])) + .toMatchInlineSnapshot(` + [ + { + "data": 1, + "id": "any", + "type": "next", + }, + { + "data": 2, + "id": "any", + "type": "next", + }, + { + "data": 3, + "id": "any", + "type": "next", + }, + { + "id": "any", + "type": "complete", + }, + ] + `); + }); + + it('should handle unsubscribe message', async ctx => { + ctx.consumer.registerSubscribable(AnyOp, data => { + return new Observable(observer => { + data.forEach((v: number) => { + setTimeout(() => { + observer.next(v); + }, 1); + }); + setTimeout(() => { + observer.complete(); + }, 1); + }); + }); + + const op = new AnyOp(new Uint8Array([1, 2, 3])).toSubscribeMessage()[0]; + ctx.handlers.subscribe(op); + + ctx.handlers.unsubscribe({ type: 'unsubscribe', id: op.id }); + + await vi.advanceTimersToNextTimerAsync(); + expect(ctx.postMessage.mock.calls).toMatchInlineSnapshot(`[]`); + }); +}); diff --git a/packages/common/infra/src/op/__tests__/message.spec.ts b/packages/common/infra/src/op/__tests__/message.spec.ts new file mode 100644 index 0000000000000..677ab0fc239aa --- /dev/null +++ b/packages/common/infra/src/op/__tests__/message.spec.ts @@ -0,0 +1,76 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + AutoMessageHandler, + ignoreUnknownEvent, + KNOWN_MESSAGE_TYPES, + type MessageCommunicapable, + type MessageHandlers, +} from '../message'; + +class CustomMessageHandler extends AutoMessageHandler { + public handlers: Partial = { + call: vi.fn(), + cancel: vi.fn(), + subscribe: vi.fn(), + unsubscribe: vi.fn(), + return: vi.fn(), + next: vi.fn(), + error: vi.fn(), + complete: vi.fn(), + }; +} + +declare module 'vitest' { + interface TestContext { + sendPort: MessageCommunicapable; + receivePort: MessageCommunicapable; + handler: CustomMessageHandler; + } +} + +describe('message', () => { + beforeEach(ctx => { + const listeners: ((event: MessageEvent) => void)[] = []; + ctx.sendPort = { + postMessage: (msg: any) => { + listeners.forEach(listener => { + listener(new MessageEvent('message', { data: msg })); + }); + }, + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + }; + + ctx.receivePort = { + postMessage: vi.fn(), + addEventListener: vi.fn((_event, handler) => { + listeners.push(handler); + }), + removeEventListener: vi.fn(), + }; + ctx.handler = new CustomMessageHandler(ctx.receivePort); + ctx.handler.listen(); + }); + + it('should ignore unknown message type', ctx => { + const handler = vi.fn(); + // @ts-expect-error internal api + ctx.handler.handleMessage = ignoreUnknownEvent(handler); + + ctx.sendPort.postMessage('connected'); + ctx.sendPort.postMessage({ type: 'call1' }); + ctx.sendPort.postMessage(new Uint8Array()); + ctx.sendPort.postMessage(null); + ctx.sendPort.postMessage(undefined); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('should handle known message type', async ctx => { + for (const type of KNOWN_MESSAGE_TYPES) { + ctx.sendPort.postMessage({ type }); + expect(ctx.handler.handlers[type]).toBeCalled(); + } + }); +}); diff --git a/packages/common/infra/src/op/__tests__/op.spec.ts b/packages/common/infra/src/op/__tests__/op.spec.ts new file mode 100644 index 0000000000000..48158bf3fb033 --- /dev/null +++ b/packages/common/infra/src/op/__tests__/op.spec.ts @@ -0,0 +1,41 @@ +import { describe, expect, it } from 'vitest'; + +import { Op } from '../types'; + +class AddOp extends Op<{ a: number; b: number }, number> {} +class BinOp extends Op {} + +describe('op', () => { + it('should generate call message', () => { + const op = new AddOp({ a: 1, b: 2 }); + const [msg, transferables] = op.toCallMessage(); + expect(msg.type).toBe('call'); + expect(msg.name).toBe(AddOp.name); + expect(msg.payload).toEqual({ a: 1, b: 2 }); + expect(transferables).toBeUndefined(); + }); + + it('should generate unique call id', () => { + const op = new AddOp({ a: 1, b: 2 }); + const [msg1, _] = op.toCallMessage(); + const [msg2, __] = op.toCallMessage(); + expect(msg1.id).not.toBe(msg2.id); + }); + + it('should generate subscribe message', () => { + const op = new BinOp(new Uint8Array([1, 2, 3])); + const [msg, transferables] = op.toSubscribeMessage(); + expect(msg.type).toBe('subscribe'); + expect(msg.name).toBe(BinOp.name); + expect(msg.payload).toEqual(new Uint8Array([1, 2, 3])); + expect(transferables).toBeUndefined(); + }); + + it('should return transferables', () => { + const op = new BinOp(new Uint8Array([1, 2, 3])); + const [_, transferables] = op + .transfer([new Uint8Array([4, 5, 6])]) + .toCallMessage(); + expect(transferables).toEqual([new Uint8Array([4, 5, 6])]); + }); +}); diff --git a/packages/common/infra/src/op/__tests__/producer.spec.ts b/packages/common/infra/src/op/__tests__/producer.spec.ts new file mode 100644 index 0000000000000..895394837607d --- /dev/null +++ b/packages/common/infra/src/op/__tests__/producer.spec.ts @@ -0,0 +1,219 @@ +import { afterEach } from 'node:test'; + +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import type { MessageHandlers } from '../message'; +import { OpProducer } from '../producer'; +import { Op } from '../types'; + +declare module 'vitest' { + interface TestContext { + producer: OpProducer; + handlers: MessageHandlers; + postMessage: ReturnType; + } +} + +class AddOp extends Op<{ a: number; b: number }, number> { + protected override getId(): string { + return 'add'; + } +} +class BinOp extends Op { + protected override getId(): string { + return 'bin'; + } +} +class SubOp extends Op { + protected override getId(): string { + return 'sub'; + } +} + +describe('op producer', () => { + beforeEach(ctx => { + const { port1 } = new MessageChannel(); + // @ts-expect-error patch postMessage + port1.postMessage = vi.fn(port1.postMessage); + // @ts-expect-error patch postMessage + ctx.postMessage = port1.postMessage; + ctx.producer = new OpProducer(port1); + // @ts-expect-error internal api + ctx.handlers = ctx.producer.handlers; + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should send call op', async ctx => { + // @ts-expect-error internal api + const pendingCalls = ctx.producer.pendingCalls; + const result = ctx.producer.call(new AddOp({ a: 1, b: 2 })); + + expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "id": "add", + "name": "AddOp", + "payload": { + "a": 1, + "b": 2, + }, + "type": "call", + } + `); + expect(pendingCalls.has('add')).toBe(true); + + // fake consumer return + ctx.handlers.return({ type: 'return', id: 'add', data: 3 }); + + await expect(result).resolves.toBe(3); + + expect(pendingCalls.has('add')).toBe(false); + }); + + it('should transfer transferables with call op', async ctx => { + const data = new Uint8Array([1, 2, 3]); + const result = ctx.producer.call(new BinOp(data).transfer([data.buffer])); + + expect(ctx.postMessage.mock.calls[0][1].transfer[0]).toBeInstanceOf( + ArrayBuffer + ); + + // fake consumer return + ctx.handlers.return({ + type: 'return', + id: 'bin', + data: new Uint8Array([3, 2, 1]), + }); + + await expect(result).resolves.toEqual(new Uint8Array([3, 2, 1])); + expect(data.byteLength).toBe(0); + }); + + it('should cancel call', async ctx => { + const promise = ctx.producer.call(new AddOp({ a: 1, b: 2 })); + + promise.cancel(); + + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "id": "add", + "type": "cancel", + }, + ] + `); + + await expect(promise).rejects.toThrow('canceled'); + }); + + it('should timeout call', async ctx => { + const promise = ctx.producer.call(new AddOp({ a: 1, b: 2 })); + + vi.advanceTimersByTime(4000); + + await expect(promise).rejects.toThrow('timeout'); + }); + + it('should send subscribe op', async ctx => { + let ob = { + next: vi.fn(), + error: vi.fn(), + complete: vi.fn(), + }; + + // @ts-expect-error internal api + const subscriptions = ctx.producer.obs; + ctx.producer.subscribe(new SubOp(new Uint8Array([1, 2, 3])), ob); + + expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "id": "sub", + "name": "SubOp", + "payload": Uint8Array [ + 1, + 2, + 3, + ], + "type": "subscribe", + } + `); + expect(subscriptions.has('sub')).toBe(true); + + // fake consumer return + ctx.handlers.next({ type: 'next', id: 'sub', data: 1 }); + ctx.handlers.next({ type: 'next', id: 'sub', data: 2 }); + ctx.handlers.next({ type: 'next', id: 'sub', data: 3 }); + + expect(subscriptions.has('sub')).toBe(true); + + ctx.handlers.complete({ type: 'complete', id: 'sub' }); + + expect(ob.next).toHaveBeenCalledTimes(3); + expect(ob.complete).toHaveBeenCalledTimes(1); + + expect(subscriptions.has('sub')).toBe(false); + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "id": "sub", + "type": "unsubscribe", + }, + ] + `); + + // smoking + ob = { + next: vi.fn(), + error: vi.fn(), + complete: vi.fn(), + }; + ctx.producer.subscribe(new SubOp(new Uint8Array([1, 2, 3])), ob); + + expect(subscriptions.has('sub')).toBe(true); + + ctx.handlers.next({ type: 'next', id: 'sub', data: 1 }); + ctx.handlers.error({ type: 'error', id: 'sub', error: new Error('test') }); + + expect(ob.next).toHaveBeenCalledTimes(1); + expect(ob.error).toHaveBeenCalledTimes(1); + + expect(subscriptions.has('sub')).toBe(false); + }); + + it('should transfer transferables with subscribe op', async ctx => { + const data = new Uint8Array([1, 2, 3]); + const unsubscribe = ctx.producer.subscribe( + new BinOp(data).transfer([data.buffer]), + { + next: vi.fn(), + } + ); + + expect(data.byteLength).toBe(0); + + unsubscribe(); + }); + + it('should unsubscribe subscription op', ctx => { + const unsubscribe = ctx.producer.subscribe( + new SubOp(new Uint8Array([1, 2, 3])), + { + next: vi.fn(), + } + ); + + unsubscribe(); + + expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(` + [ + { + "id": "sub", + "type": "unsubscribe", + }, + ] + `); + }); +}); diff --git a/packages/common/infra/src/op/consumer.ts b/packages/common/infra/src/op/consumer.ts new file mode 100644 index 0000000000000..81f277469b8e7 --- /dev/null +++ b/packages/common/infra/src/op/consumer.ts @@ -0,0 +1,222 @@ +import EventEmitter2 from 'eventemitter2'; +import type { Observable, Subscription } from 'rxjs'; + +import { + AutoMessageHandler, + type CallMessage, + type MessageHandlers, + type ReturnMessage, + type SubscribeMessage, + type SubscriptionCompleteMessage, + type SubscriptionErrorMessage, + type SubscriptionNextMessage, +} from './message'; +import type { Op, OpInput, OpOutput } from './types'; + +interface OpCallContext { + consumer: OpConsumer; + signal?: AbortSignal; +} + +export type OpHandler> = ( + payload: OpInput, + ctx: OpCallContext +) => Promise> | OpOutput; + +export type OpSubscribableHandler> = ( + payload: OpInput, + ctx: OpCallContext +) => Observable>; + +const TRANSFERABLES_CACHE = new Map(); +export function transfer(data: T, transferables: Transferable[]): T { + TRANSFERABLES_CACHE.set(data, transferables); + return data; +} +function fetchTransferables(data: any): Transferable[] | undefined { + const transferables = TRANSFERABLES_CACHE.get(data); + if (transferables) { + TRANSFERABLES_CACHE.delete(data); + } + + return transferables; +} + +export class OpConsumer extends AutoMessageHandler { + private readonly eventBus = new EventEmitter2(); + + private readonly registeredOpHandlers = new Map< + string, + OpHandler> + >(); + + private readonly registeredSubscribableOpHandlers = new Map< + string, + OpSubscribableHandler> + >(); + + private readonly processing = new Map(); + private readonly subscriptions = new Map(); + + override get handlers() { + return { + call: this.handleCallMessage, + cancel: this.handleCancelMessage, + subscribe: this.handleSubscribeMessage, + unsubscribe: this.handleUnsubscribeMessage, + }; + } + + private readonly handleCallMessage: MessageHandlers['call'] = async msg => { + const abortController = new AbortController(); + this.processing.set(msg.id, abortController); + + try { + this.eventBus.emit(`before:${msg.name}`, msg.payload); + const ret = await this.call(msg, abortController.signal); + this.eventBus.emit(`after:${msg.name}`, msg.payload, ret); + const transferables = fetchTransferables(ret); + this.port.postMessage( + { + type: 'return', + id: msg.id, + data: ret, + } satisfies ReturnMessage, + { transfer: transferables } + ); + } catch (e) { + if (!this.processing.has(msg.id)) { + return; + } + + this.port.postMessage({ + type: 'return', + id: msg.id, + error: e as any, + } satisfies ReturnMessage); + } finally { + this.processing.delete(msg.id); + } + }; + + private readonly handleCancelMessage: MessageHandlers['cancel'] = msg => { + const abortController = this.processing.get(msg.id); + if (!abortController) { + return; + } + + abortController.abort(); + }; + + private readonly handleSubscribeMessage: MessageHandlers['subscribe'] = + msg => { + try { + const subscription = this.ob$(msg).subscribe({ + next: data => { + const transferables = fetchTransferables(data); + this.port.postMessage( + { + type: 'next', + id: msg.id, + data, + } satisfies SubscriptionNextMessage, + { transfer: transferables } + ); + }, + error: error => { + this.port.postMessage({ + type: 'error', + id: msg.id, + error: error as Error, + } satisfies SubscriptionErrorMessage); + }, + complete: () => { + this.port.postMessage({ + type: 'complete', + id: msg.id, + } satisfies SubscriptionCompleteMessage); + }, + }); + + subscription.add(() => { + this.subscriptions.delete(msg.id); + }); + this.subscriptions.set(msg.id, subscription); + } catch (e) { + this.port.postMessage({ + type: 'error', + id: msg.id, + error: e as Error, + } satisfies SubscriptionErrorMessage); + } + }; + + private readonly handleUnsubscribeMessage: MessageHandlers['unsubscribe'] = + msg => { + const subscription = this.subscriptions.get(msg.id); + if (!subscription) { + return; + } + + subscription.unsubscribe(); + }; + + register>( + op: { new (...args: any[]): T }, + handler: OpHandler + ) { + this.registeredOpHandlers.set(op.name, handler); + } + + registerSubscribable>( + op: { new (...args: any[]): T }, + handler: OpSubscribableHandler + ) { + this.registeredSubscribableOpHandlers.set(op.name, handler); + } + + before>( + op: { new (...args: any[]): T }, + handler: (input: OpInput) => void + ) { + this.eventBus.on(`before:${op.name}`, handler); + } + + after>( + op: { new (...args: any[]): T }, + handler: (input: OpInput, output: OpOutput) => void + ) { + this.eventBus.on(`after:${op.name}`, handler); + } + + /** + * @internal + */ + async call(op: CallMessage, signal: AbortSignal) { + const handler = this.registeredOpHandlers.get(op.name); + if (!handler) { + throw new Error(`Handler for operation [${op.name}] is not registered.`); + } + + return handler(op.payload, { consumer: this, signal }); + } + + /** + * @internal + */ + ob$(op: SubscribeMessage): Observable>> { + const handler = this.registeredSubscribableOpHandlers.get(op.name); + if (!handler) { + throw new Error(`Handler for operation [${op.name}] is not registered.`); + } + + return handler(op.payload, { consumer: this }); + } + + destroy() { + this.registeredOpHandlers.clear(); + this.registeredSubscribableOpHandlers.clear(); + this.processing.clear(); + this.subscriptions.clear(); + } +} diff --git a/packages/common/infra/src/op/index.ts b/packages/common/infra/src/op/index.ts new file mode 100644 index 0000000000000..1bd6f3ebf75ab --- /dev/null +++ b/packages/common/infra/src/op/index.ts @@ -0,0 +1,4 @@ +export * from './consumer'; +export type { MessageCommunicapable } from './message'; +export * from './producer'; +export * from './types'; diff --git a/packages/common/infra/src/op/message.ts b/packages/common/infra/src/op/message.ts new file mode 100644 index 0000000000000..e2289576b5848 --- /dev/null +++ b/packages/common/infra/src/op/message.ts @@ -0,0 +1,140 @@ +const PRODUCER_MESSAGE_TYPES = [ + 'call', + 'cancel', + 'subscribe', + 'unsubscribe', +] as const; +const CONSUMER_MESSAGE_TYPES = ['return', 'next', 'error', 'complete'] as const; +export const KNOWN_MESSAGE_TYPES = new Set([ + ...PRODUCER_MESSAGE_TYPES, + ...CONSUMER_MESSAGE_TYPES, +]); + +type MessageType = + | (typeof PRODUCER_MESSAGE_TYPES)[number] + | (typeof CONSUMER_MESSAGE_TYPES)[number]; + +export interface Message { + type: MessageType; +} + +// in +export interface CallMessage extends Message { + type: 'call'; + id: string; + name: string; + payload: any; +} + +export interface CancelMessage extends Message { + type: 'cancel'; + id: string; +} + +export interface SubscribeMessage extends Message { + type: 'subscribe'; + id: string; + name: string; + payload: any; +} + +export interface UnsubscribeMessage extends Message { + type: 'unsubscribe'; + id: string; +} + +// out +export type ReturnMessage = { + type: 'return'; + id: string; +} & ( + | { + data: any; + } + | { + error: Error; + } +); + +export interface SubscriptionNextMessage extends Message { + type: 'next'; + id: string; + data: any; +} + +export interface SubscriptionErrorMessage extends Message { + type: 'error'; + id: string; + error: Error; +} + +export type SubscriptionCompleteMessage = { + type: 'complete'; + id: string; +}; + +export type Messages = + | CallMessage + | CancelMessage + | SubscribeMessage + | UnsubscribeMessage + | ReturnMessage + | SubscriptionNextMessage + | SubscriptionErrorMessage + | SubscriptionCompleteMessage; + +export type MessageHandlers = { + [Type in Messages['type']]: ( + message: Extract + ) => void; +}; + +export type MessageCommunicapable = Pick< + MessagePort, + 'postMessage' | 'addEventListener' | 'removeEventListener' +> & { + start?(): void; + close?(): void; +}; + +export function ignoreUnknownEvent(handler: (data: Messages) => void) { + return (event: MessageEvent) => { + const data = event.data; + + if ( + !data || + typeof data !== 'object' || + typeof data.type !== 'string' || + !KNOWN_MESSAGE_TYPES.has(data.type) + ) { + return; + } + + handler(data as any); + }; +} + +export abstract class AutoMessageHandler { + protected abstract handlers: Partial; + + constructor(protected readonly port: MessageCommunicapable) {} + + protected handleMessage = ignoreUnknownEvent((msg: Messages) => { + const handler = this.handlers[msg.type]; + if (!handler) { + return; + } + + handler(msg as any); + }); + + listen() { + this.port.addEventListener('message', this.handleMessage); + this.port.start?.(); + } + + close() { + this.port.close?.(); + this.port.removeEventListener('message', this.handleMessage); + } +} diff --git a/packages/common/infra/src/op/producer.ts b/packages/common/infra/src/op/producer.ts new file mode 100644 index 0000000000000..a765d7bd70f89 --- /dev/null +++ b/packages/common/infra/src/op/producer.ts @@ -0,0 +1,126 @@ +import { Observable, type Observer } from 'rxjs'; + +import { + AutoMessageHandler, + type CancelMessage, + type MessageHandlers, + type UnsubscribeMessage, +} from './message'; +import type { Op } from './types'; + +export interface CancelablePromise extends Promise { + cancel(): void; +} + +export class OpProducer extends AutoMessageHandler { + private readonly pendingCalls = new Map>(); + private readonly obs = new Map>(); + + protected override get handlers() { + return { + return: this.handleReturnMessage, + next: this.handleSubscriptionNextMessage, + error: this.handleSubscriptionErrorMessage, + complete: this.handleSubscriptionCompleteMessage, + }; + } + + private readonly handleReturnMessage: MessageHandlers['return'] = msg => { + const pending = this.pendingCalls.get(msg.id); + if (!pending) { + return; + } + + if ('error' in msg) { + pending.reject(msg.error); + } else { + pending.resolve(msg.data); + } + this.pendingCalls.delete(msg.id); + }; + + private readonly handleSubscriptionNextMessage: MessageHandlers['next'] = + msg => { + const ob = this.obs.get(msg.id); + if (!ob) { + return; + } + + ob.next(msg.data); + }; + + private readonly handleSubscriptionErrorMessage: MessageHandlers['error'] = + msg => { + const ob = this.obs.get(msg.id); + if (!ob) { + return; + } + + ob.error(msg.error); + }; + + private readonly handleSubscriptionCompleteMessage: MessageHandlers['complete'] = + msg => { + const ob = this.obs.get(msg.id); + if (!ob) { + return; + } + + ob.complete(); + }; + + call(op: Op): CancelablePromise { + const promiseWithResolvers = Promise.withResolvers(); + const [msg, transferables] = op.toCallMessage(); + + const raise = (reason: string) => { + promiseWithResolvers.reject(new Error(reason)); + this.pendingCalls.delete(msg.id); + }; + + const promise = promiseWithResolvers.promise as CancelablePromise; + + promise.cancel = () => { + this.port.postMessage({ + type: 'cancel', + id: msg.id, + } satisfies CancelMessage); + + raise('canceled'); + }; + + setTimeout(() => { + raise('timeout'); + }, 3000 /* TODO: make it configurable */); + + this.port.postMessage(msg, { transfer: transferables }); + this.pendingCalls.set(msg.id, promiseWithResolvers); + + return promise; + } + + subscribe( + op: Op, + observer: Partial> | ((value: Out) => void) + ): () => void { + const [msg, transferables] = op.toSubscribeMessage(); + + const sub = new Observable(ob => { + this.obs.set(msg.id, ob); + }).subscribe(observer); + + sub.add(() => { + this.obs.delete(msg.id); + this.port.postMessage({ + type: 'unsubscribe', + id: msg.id, + } satisfies UnsubscribeMessage); + }); + + this.port.postMessage(msg, { transfer: transferables }); + + return () => { + sub.unsubscribe(); + }; + } +} diff --git a/packages/common/infra/src/op/types.ts b/packages/common/infra/src/op/types.ts new file mode 100644 index 0000000000000..ee18064b7b526 --- /dev/null +++ b/packages/common/infra/src/op/types.ts @@ -0,0 +1,48 @@ +import type { CallMessage, SubscribeMessage } from './message'; + +export class Op { + // type holder + protected readonly _out?: Out; + private transferables?: Transferable[]; + + constructor(public readonly payload: In) {} + + protected getId() { + return Math.random().toString(36).slice(2, 9); + } + + transfer(transferables: Transferable[]) { + this.transferables = transferables; + return this; + } + + toCallMessage(): [CallMessage, Transferable[] | undefined] { + return [ + { + type: 'call', + id: this.getId(), + name: this.constructor.name, + payload: this.payload, + }, + this.transferables, + ]; + } + + toSubscribeMessage(): [SubscribeMessage, Transferable[] | undefined] { + return [ + { + type: 'subscribe', + id: this.getId(), + name: this.constructor.name, + payload: this.payload, + }, + this.transferables, + ]; + } +} + +export type OpInput> = + OpType extends Op ? (In extends void ? never : In) : never; + +export type OpOutput> = + OpType extends Op ? Out : never;