Skip to content

Commit

Permalink
feat(infra): introduce op pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Nov 7, 2024
1 parent add8c56 commit 6c2dbcc
Show file tree
Hide file tree
Showing 11 changed files with 1,236 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/common/infra/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"@affine/templates": "workspace:*",
"@blocksuite/affine": "0.17.26",
"@datastructures-js/binary-search-tree": "^5.3.2",
"eventemitter2": "^6.4.9",
"foxact": "^0.2.33",
"fractional-indexing": "^3.2.0",
"fuse.js": "^7.0.0",
Expand Down
174 changes: 174 additions & 0 deletions packages/common/infra/src/op/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# usage

## Register Op Handlers

### Function call handler

```ts
class AddOp extends Op<{ a: number; b: number }, number> {}

// register
const consumer: OpConsumer;
consumer.register(AddOp, ({ a, b }) => a + b);

// call
const producer: OpProducer;
const ret = producer.send(new AddOp({ a: 1, b: 2 })); // Promise<3>
```

### Stream call handler

```ts
class SubscribeStatusOp extends Op<string, string> {}

// register
const consumer: OpConsumer;
consumer.registerSubscribable(SubscribeStatusOp, (name: string) => {
return interval(3000).pipe(map(() => 'connected'));
});

// subscribe
const producer: OpProducer;
producer.subscribe(new SubscribeStatusOp('server'), {
next: status => {
ui.setServerStatus(status);
},
error: error => {
ui.setServerError(error);
},
complete: () => {
//
},
});
```

### Transfer variables

> [Transferable Objects](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects)
#### Producer transferables

```ts
class JobOp extends Op<{ name: string; data: Uint8Array; data2: Uint8Array }, void> {}

const producer: OpProducer;
const data = new Uint8Array([1, 2, 3]);
const nonTransferredData = new Uint8Array([1, 2, 3]);
producer.send(new JobOp({ name: 'compress', data, data2: nonTransferredData }).transfer([data.buffer]));

// after transferring, you can not use the transferred variables anymore!!!
// moved
assertEq(data.byteLength, 0);
// copied
assertEq(nonTransferredData.byteLength, 3);
```

#### Consumer transferables

```ts
class JobOp extends Op<{ id: string }, Uint8Array> {}

const consumer: OpConsumer;
consumer.register(JobOp, ({ id }) => {
const data = new Uint8Array([1, 2, 3]);
return transfer(data, [data.buffer]);
});
consumer.registerSubscribable(JobOp, ({ id }) => {
return interval(3000).pipe(
map(() => {
const data = new Uint8Array([1, 2, 3]);
transfer(data, [data.buffer]);
})
);
});
```

## Communication

### BroadcastChannel

:::CAUTION

BroadcastChannel doesn't support transfer transferable objects. All data passed through it's `postMessage` api would be structured cloned

see [Structured_clone_algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)

:::

```ts
const channel = new BroadcastChannel('domain');
const consumer = new OpConsumer(channel);
consumer.listen();

const producer = new OpProducer(channel);
producer.listen();
```

### MessagePort

```ts
const { port1, port2 } = new MessagePort();

const producer = new OpProducer(port1);
const consumer = new OpConsumer(port2);
```

### Worker

```ts
const worker = new Worker('./xxx-worker');
const producer = new OpProducer(worker);

// in worker
const consumer = new OpConsumer(globalThis);
consumer.listen();
```

### SharedWorker

```ts
const worker = new SharedWorker('./xxx-worker');
const producer = new OpProducer(worker.port);

// in worker
globalThis.addEventListener('connect', event => {
const port = event.ports[0];
const consumer = new OpConsumer(port);
consumer.listen();
});
```

## Why Pass Operations by Classes instead of Runtimeless Interfaces

### clean code in caller side

```ts
// class
class XXXOp extends Op<void, void> {}

send(new XXXOp());

// interface
send<Op<void, void>>({});
```

### avoid magic strings & straightforward type checking

```ts
// class
class AddOp extends Op<{ a: number; b: number }, number> {}
send(new AddOp({ a: 1, b: 2 }));

register(AddOp, ({ a, b }) => a + b);

// interface
interface MyOps {
add: [{ a: number; b: number }, number];
// ^^^^^^^^^^^^^^^^ input ^ output
}

type OpFromMyOps<T extends keyof MyOps> = Op</* refer Input */, /* refer Output */>

send<OpFromMyOps<'add'>>('add', { a: 1, b: 2 });
register<OpFromMyOps<'add'>>('add', ({ a, b }) => a + b);
```
185 changes: 185 additions & 0 deletions packages/common/infra/src/op/__tests__/consumer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import { afterEach } from 'node:test';

import { Observable } from 'rxjs';
import { beforeEach, describe, expect, it, vi } from 'vitest';

import { OpConsumer, transfer } from '../consumer';
import type { MessageHandlers } from '../message';
import { Op } from '../types';

declare module 'vitest' {
interface TestContext {
consumer: OpConsumer;
handlers: MessageHandlers;
postMessage: ReturnType<typeof vi.fn>;
}
}

class AddOp extends Op<{ a: number; b: number }, number> {
protected override getId(): string {
return 'add';
}
}
class AnyOp extends Op<any, any> {
protected override getId(): string {
return 'any';
}
}

describe('op producer', () => {
beforeEach(ctx => {
const { port2 } = new MessageChannel();
// @ts-expect-error patch postMessage
port2.postMessage = vi.fn(port2.postMessage);
// @ts-expect-error patch postMessage
ctx.postMessage = port2.postMessage;
ctx.consumer = new OpConsumer(port2);
// @ts-expect-error internal api
ctx.handlers = ctx.consumer.handlers;
vi.useFakeTimers();
});

afterEach(() => {
vi.useRealTimers();
});

it('should throw if no handler registered', async ctx => {
ctx.handlers.call(new AddOp({ a: 1, b: 2 }).toCallMessage()[0]);
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.lastCall).toMatchInlineSnapshot(`
[
{
"error": [Error: Handler for operation [AddOp] is not registered.],
"id": "add",
"type": "return",
},
]
`);
});

it('should handle call message', async ctx => {
ctx.consumer.register(AddOp, ({ a, b }) => a + b);

const op = new AddOp({ a: 1, b: 2 });
ctx.handlers.call(op.toCallMessage()[0]);
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.calls[0][0]).toMatchInlineSnapshot(`
{
"data": 3,
"id": "add",
"type": "return",
}
`);
});

it('should handle cancel message', async ctx => {
ctx.consumer.register(AddOp, ({ a, b }, { signal }) => {
const { reject, resolve, promise } = Promise.withResolvers<number>();

signal?.addEventListener('abort', () => {
reject(new Error('canceled'));
});

setTimeout(() => {
resolve(a + b);
}, Number.MAX_SAFE_INTEGER);

return promise;
});

const op = new AddOp({ a: 1, b: 2 }).toCallMessage()[0];
ctx.handlers.call(op);
ctx.handlers.cancel({ type: 'cancel', id: op.id });

await vi.advanceTimersByTimeAsync(1);

expect(ctx.postMessage.mock.calls).toMatchInlineSnapshot(`
[
[
{
"error": [Error: canceled],
"id": "add",
"type": "return",
},
],
]
`);
});

it('should transfer transferables in return', async ctx => {
const data = new Uint8Array([1, 2, 3]);
const nonTransferred = new Uint8Array([4, 5, 6]);

ctx.consumer.register(AnyOp, () => {
return transfer({ data: { data, nonTransferred } }, [data.buffer]);
});

const op = new AnyOp({}).toCallMessage()[0];
ctx.handlers.call(op);
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage).toHaveBeenCalledOnce();

expect(data.byteLength).toBe(0);
expect(nonTransferred.byteLength).toBe(3);
});

it('should handle subscribe message', async ctx => {
ctx.consumer.registerSubscribable(AnyOp, data => {
return new Observable(observer => {
data.forEach((v: number) => observer.next(v));
observer.complete();
});
});

const op = new AnyOp(new Uint8Array([1, 2, 3])).toSubscribeMessage()[0];
ctx.handlers.subscribe(op);
await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.calls.map(call => call[0]))
.toMatchInlineSnapshot(`
[
{
"data": 1,
"id": "any",
"type": "next",
},
{
"data": 2,
"id": "any",
"type": "next",
},
{
"data": 3,
"id": "any",
"type": "next",
},
{
"id": "any",
"type": "complete",
},
]
`);
});

it('should handle unsubscribe message', async ctx => {
ctx.consumer.registerSubscribable(AnyOp, data => {
return new Observable(observer => {
data.forEach((v: number) => {
setTimeout(() => {
observer.next(v);
}, 1);
});
setTimeout(() => {
observer.complete();
}, 1);
});
});

const op = new AnyOp(new Uint8Array([1, 2, 3])).toSubscribeMessage()[0];
ctx.handlers.subscribe(op);

ctx.handlers.unsubscribe({ type: 'unsubscribe', id: op.id });

await vi.advanceTimersToNextTimerAsync();
expect(ctx.postMessage.mock.calls).toMatchInlineSnapshot(`[]`);
});
});
Loading

0 comments on commit 6c2dbcc

Please sign in to comment.