diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index a2a471ac0baf0..89b75151844e8 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -8,6 +8,7 @@ "./storage": "./src/storage/index.ts", "./utils": "./src/utils/index.ts", "./app-config-storage": "./src/app-config-storage.ts", + "./op": "./src/op/index.ts", ".": "./src/index.ts" }, "dependencies": { diff --git a/packages/common/infra/src/op/README.md b/packages/common/infra/src/op/README.md index ba7f92af76bdc..232dbb04fc6e3 100644 --- a/packages/common/infra/src/op/README.md +++ b/packages/common/infra/src/op/README.md @@ -124,10 +124,10 @@ const client = new OpClient(channel); client.listen(); ``` -### MessagePort +### MessageChannel ```ts -const { port1, port2 } = new MessagePort(); +const { port1, port2 } = new MessageChannel(); const client = new OpClient(port1); const consumer = new OpConsumer(port2); diff --git a/packages/common/infra/src/op/client.ts b/packages/common/infra/src/op/client.ts index bdf66b0d0a003..e07f0c73f728e 100644 --- a/packages/common/infra/src/op/client.ts +++ b/packages/common/infra/src/op/client.ts @@ -22,7 +22,7 @@ interface PendingCall extends PromiseWithResolvers { timeout: number | NodeJS.Timeout; } -interface OpClientOptions { +export interface OpClientOptions { timeout?: number; } diff --git a/packages/common/infra/src/op/message.ts b/packages/common/infra/src/op/message.ts index 852cdc6d29b9f..bd06a9e58ed1d 100644 --- a/packages/common/infra/src/op/message.ts +++ b/packages/common/infra/src/op/message.ts @@ -95,6 +95,7 @@ export type MessageCommunicapable = Pick< > & { start?(): void; close?(): void; + terminate?(): void; // For Worker }; export function ignoreUnknownEvent(handler: (data: Messages) => void) { @@ -150,6 +151,7 @@ export abstract class AutoMessageHandler { close() { this.port.close?.(); + this.port.terminate?.(); // For Worker this.port.removeEventListener('message', this.handleMessage); } } diff --git a/packages/frontend/core/src/components/attachment-viewer/utils.ts b/packages/frontend/core/src/components/attachment-viewer/utils.ts index bda4b1000fcad..04e5a7c9479e0 100644 --- a/packages/frontend/core/src/components/attachment-viewer/utils.ts +++ b/packages/frontend/core/src/components/attachment-viewer/utils.ts @@ -1,3 +1,4 @@ +import type { RenderOut } from '@affine/core/modules/pdf/workers/types'; import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; import { filesize } from 'filesize'; @@ -29,19 +30,18 @@ export async function download(model: AttachmentBlockModel) { export function renderItem( scroller: HTMLElement | null, className: string, - id: number, - width: number, - height: number, - buffer: Uint8ClampedArray + data: RenderOut ) { if (!scroller) return; const item = scroller.querySelector( - `[data-index="${id}"] > div.${className}` + `[data-index="${data.index}"] > div.${className}` ); if (!item) return; if (item.firstElementChild) return; + const { width, height, buffer } = data; + const canvas = document.createElement('canvas'); const ctx = canvas.getContext('2d'); if (!ctx) return; @@ -67,6 +67,13 @@ export function buildAttachmentProps(model: AttachmentBlockModel) { return { model, name, ext, size, isPDF }; } +/** + * Generates a set of sequences. + * + * 1. when `start` is `0`, returns `[0, .., 5]` + * 2. when `end` is `total - 1`, returns `[total - 1, .., total - 5]` + * 2. when `start > 0` and `end < total - 1`, returns `[18, 17, 19, 16, 20, 15, 21]` + */ export function genSeq(start: number, end: number, total: number) { start = Math.max(start, 0); end = Math.min(end, Math.max(total - 1, 0)); diff --git a/packages/frontend/core/src/components/attachment-viewer/viewer.tsx b/packages/frontend/core/src/components/attachment-viewer/viewer.tsx index 3040d04a9e15e..a8fd26e4060de 100644 --- a/packages/frontend/core/src/components/attachment-viewer/viewer.tsx +++ b/packages/frontend/core/src/components/attachment-viewer/viewer.tsx @@ -1,10 +1,12 @@ import { IconButton, observeResize, Scrollable } from '@affine/component'; +import type { Pdf, PdfSender } from '@affine/core/modules/pdf'; +import { PdfsService } from '@affine/core/modules/pdf'; import { - type PDFChannel, - PDFService, - type PDFWorker, -} from '@affine/core/modules/pdf'; -import { MessageOp, RenderKind } from '@affine/core/modules/pdf/workers/types'; + defaultDocInfo, + RenderKind, + type RenderOut, + State, +} from '@affine/core/modules/pdf/workers/types'; import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; import { CollapseIcon, ExpandIcon } from '@blocksuite/icons/rc'; import { LiveData, useLiveData, useService } from '@toeverything/infra'; @@ -24,7 +26,7 @@ import type { VirtuosoHandle, VirtuosoProps } from 'react-virtuoso'; import { Virtuoso } from 'react-virtuoso'; import * as styles from './styles.css'; -import { genSeq, getAttachmentBlob, renderItem } from './utils'; +import { genSeq, renderItem } from './utils'; type ItemProps = VirtuosoProps; @@ -105,23 +107,20 @@ interface ViewerProps { export const Viewer = ({ model }: ViewerProps): ReactElement => { const { showBoundary } = useErrorBoundary(); - const service = useService(PDFService); - const [worker, setWorker] = useState(null); - const docInfo = useLiveData( + const pdfsService = useService(PdfsService); + const [pdf, setPdf] = useState(null); + const [sender, setSender] = useState(null); + const [cursor, setCursor] = useState(0); + const info = useLiveData( useMemo( () => - worker - ? worker.docInfo$ - : new LiveData({ - total: 0, - width: 1, - height: 1, - }), - [worker] + pdf + ? pdf.info$ + : new LiveData({ state: State.IDLE, ...defaultDocInfo() }), + [pdf] ) ); - const [channel, setChannel] = useState(null); - const [cursor, setCursor] = useState(0); + const [viewportInfo, setViewportInfo] = useState({ dpi: window.devicePixelRatio, width: 1, @@ -135,7 +134,17 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { startIndex: 0, endIndex: 0, }); - const mainCaches = useMemo>(() => new Set(), []); + const mainRenderingSeq$ = useMemo( + () => + new LiveData<{ + seq: Set; + diff: Set; + }>({ + seq: new Set(), + diff: new Set(), + }), + [] + ); const [collapsed, setCollapsed] = useState(true); const thumbnailsScrollerHandleRef = useRef(null); @@ -144,61 +153,33 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { startIndex: 0, endIndex: 0, }); - const thumbnailsCaches = useMemo>(() => new Set(), []); + const thumbnailsRenderingSeq$ = useMemo( + () => + new LiveData<{ + seq: Set; + diff: Set; + }>({ + seq: new Set(), + diff: new Set(), + }), + [] + ); const render = useCallback( - ( - id: number, - kind: RenderKind, - width: number, - height: number, - buffer: Uint8ClampedArray - ) => { - const isPage = kind === RenderKind.Page; + (data: RenderOut) => { + const isPage = data.kind === RenderKind.Page; const container = isPage ? scrollerRef : thumbnailsScrollerRef; const name = isPage ? 'page' : 'thumbnail'; - renderItem(container.current, `pdf-${name}`, id, width, height, buffer); + renderItem(container.current, `pdf-${name}`, data); }, [scrollerRef, thumbnailsScrollerRef] ); - const postQueue = useCallback( - (caches: Set, start: number, end: number, kind: RenderKind) => { - if (!channel) return; - - const scale = - viewportInfo.dpi * - (kind === RenderKind.Thumbnail ? THUMBNAIL_WIDTH / docInfo.width : 1); - const seq = new Set(genSeq(start, end, docInfo.total)); - - // fixes doc with only one page - if (seq.size === 1) { - channel.post(MessageOp.Render, { - index: 0, - scale, - kind, - }); - } else { - seq.difference(caches).forEach(index => { - channel.post(MessageOp.Render, { - index, - scale, - kind, - }); - }); - } - - caches.clear(); - seq.forEach(index => caches.add(index)); - }, - [docInfo, viewportInfo, channel] - ); - const onScroll = useCallback(() => { const el = scrollerRef.current; if (!el) return; - const { total } = docInfo; + const { total } = info; if (!total) return; const { scrollTop, scrollHeight } = el; @@ -209,7 +190,7 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { const cursor = Math.min(index, total - 1); setCursor(cursor); - }, [scrollerRef, docInfo]); + }, [scrollerRef, info]); const onSelect = useCallback( (index: number) => { @@ -232,30 +213,67 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { [setThumbnailsVisibleRange] ); - useEffect(() => { - const el = viewerRef.current; - if (!el) return; + const updateSeq = useCallback( + ( + range: { startIndex: number; endIndex: number }, + seq$: LiveData<{ + seq: Set; + diff: Set; + }> + ) => { + if (!sender) return; + const { startIndex, endIndex } = range; + const seq = new Set(genSeq(startIndex, endIndex, info.total)); + seq$.next({ + seq, + diff: seq.difference(seq$.value.seq), + }); + }, + [info, sender] + ); - return observeResize(el, entry => { - const rect = entry.contentRect; - setViewportInfo(info => ({ - ...info, - width: rect.width, - height: rect.height, - })); - }); - }, [viewerRef]); + const createRenderingSubscriber = useCallback( + ( + seq$: LiveData<{ + seq: Set; + diff: Set; + }>, + kind: RenderKind + ) => { + if (!sender) return; - useEffect(() => { - const { startIndex, endIndex } = mainVisibleRange; - postQueue(mainCaches, startIndex, endIndex, RenderKind.Page); - }, [postQueue, mainVisibleRange, mainCaches]); + const scale = + (kind === RenderKind.Page ? 1 : THUMBNAIL_WIDTH / info.width) * + viewportInfo.dpi; + + let unsubscribe: () => void; + + const subscriber = seq$.subscribe(({ seq: _, diff }) => { + unsubscribe?.(); + + unsubscribe = sender.subscribe( + 'render', + { seq: Array.from(diff), kind, scale }, + { + next: data => { + if (!data) return; + render(data); + }, + error: err => { + console.error(err); + unsubscribe(); + }, + } + ); + }); - useEffect(() => { - if (collapsed) return; - const { startIndex, endIndex } = thumbnailsVisibleRange; - postQueue(thumbnailsCaches, startIndex, endIndex, RenderKind.Thumbnail); - }, [postQueue, thumbnailsVisibleRange, thumbnailsCaches, collapsed]); + return () => { + unsubscribe?.(); + subscriber.unsubscribe(); + }; + }, + [viewportInfo, info, render, sender] + ); const pageContent = useCallback( (index: number) => { @@ -264,12 +282,12 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { key={index} index={index} className={clsx([styles.viewerPage, 'pdf-page'])} - width={docInfo.width} - height={docInfo.height} + width={info.width} + height={info.height} /> ); }, - [docInfo] + [info] ); const thumbnailContent = useCallback( @@ -284,12 +302,12 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { 'pdf-thumbnail', ])} width={THUMBNAIL_WIDTH} - height={Math.ceil((docInfo.height / docInfo.width) * THUMBNAIL_WIDTH)} + height={Math.ceil((info.height / info.width) * THUMBNAIL_WIDTH)} onSelect={onSelect} /> ); }, - [cursor, docInfo, onSelect] + [cursor, info, onSelect] ); const mainComponents = useMemo(() => { @@ -313,21 +331,68 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { }, []); const increaseViewportBy = useMemo(() => { - const size = Math.min(5, docInfo.total); - const itemHeight = docInfo.height + 20; + const size = Math.min(5, info.total); + const itemHeight = info.height + 20; const height = Math.ceil(size * itemHeight); return { top: height, bottom: height }; - }, [docInfo]); + }, [info]); const mainStyle = useMemo(() => { const { height: vh } = viewportInfo; - const { total: t, height: h, width: w } = docInfo; + const { total: t, height: h, width: w } = info; const height = Math.min( vh - 60 - 24 - 24 - 2 - 8, t * THUMBNAIL_WIDTH * (h / w) + (t - 1) * 12 ); return { height: `${height}px` }; - }, [docInfo, viewportInfo]); + }, [info, viewportInfo]); + + useEffect(() => { + const unsubscribe = createRenderingSubscriber( + mainRenderingSeq$, + RenderKind.Page + ); + return () => { + unsubscribe?.(); + }; + }, [scrollerRef, createRenderingSubscriber, mainRenderingSeq$]); + + useEffect(() => { + const unsubscribe = createRenderingSubscriber( + thumbnailsRenderingSeq$, + RenderKind.Thumbnail + ); + return () => { + unsubscribe?.(); + }; + }, [ + thumbnailsScrollerHandleRef, + createRenderingSubscriber, + thumbnailsRenderingSeq$, + ]); + + useEffect(() => { + const el = viewerRef.current; + if (!el) return; + + return observeResize(el, entry => { + const rect = entry.contentRect; + setViewportInfo(info => ({ + ...info, + width: rect.width, + height: rect.height, + })); + }); + }, [viewerRef]); + + useEffect(() => { + updateSeq(mainVisibleRange, mainRenderingSeq$); + }, [updateSeq, mainVisibleRange, mainRenderingSeq$]); + + useEffect(() => { + if (collapsed) return; + updateSeq(thumbnailsVisibleRange, thumbnailsRenderingSeq$); + }, [collapsed, updateSeq, thumbnailsVisibleRange, thumbnailsRenderingSeq$]); useEffect(() => { scrollerHandleRef.current?.scrollToIndex({ @@ -339,50 +404,41 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { align: 'start', }); setCursor(0); - mainCaches.clear(); - thumbnailsCaches.clear(); + mainRenderingSeq$.next({ seq: new Set(), diff: new Set() }); + thumbnailsRenderingSeq$.next({ seq: new Set(), diff: new Set() }); setMainVisibleRange({ startIndex: 0, endIndex: 0 }); setThumbnailsVisibleRange({ startIndex: 0, endIndex: 0 }); - }, [channel, mainCaches, thumbnailsCaches]); + }, [sender, mainRenderingSeq$, thumbnailsRenderingSeq$]); useLayoutEffect(() => { - const { worker, release } = service.get(model.id); - - const disposables = worker.on({ - ready: () => { - if (worker.docInfo$.value.total) { - return; - } - - getAttachmentBlob(model) - .then(blob => { - if (!blob) return; - return blob.arrayBuffer(); - }) - .then(buffer => { - if (!buffer) return; - worker.open(buffer); - }) - .catch(showBoundary); - }, - }); + if (!model.sourceId) { + showBoundary('Attachment not found'); + return; + } + + let unsubscribe: () => void; - const channel = worker.channel(); - channel - .on(({ index, kind, height, width, buffer }) => - render(index, kind, width, height, buffer) - ) - .start(); + const { pdf, release } = pdfsService.get(model); - setChannel(channel); - setWorker(worker); + setPdf(pdf); + + const subscriber = pdf.open(model).subscribe({ + error: error => { + console.log(error); + }, + complete: () => { + const { sender, release } = pdf.client.channel(); + setSender(sender); + unsubscribe = release; + }, + }); return () => { - channel.dispose(); - disposables[Symbol.dispose](); + unsubscribe?.(); + subscriber.unsubscribe(); release(); }; - }, [showBoundary, render, service, model]); + }, [showBoundary, pdfsService, model]); return (
{ className={styles.virtuoso} rangeChanged={updateMainVisibleRange} increaseViewportBy={increaseViewportBy} - totalCount={docInfo.total} + totalCount={info.total} itemContent={pageContent} components={mainComponents} /> @@ -421,7 +477,7 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => { }} rangeChanged={updateThumbnailsVisibleRange} className={styles.virtuoso} - totalCount={docInfo.total} + totalCount={info.total} itemContent={thumbnailContent} components={thumbnailsComponents} /> @@ -429,9 +485,9 @@ export const Viewer = ({ model }: ViewerProps): ReactElement => {
- {docInfo.total > 0 ? cursor + 1 : 0} + {info.total > 0 ? cursor + 1 : 0} - /{docInfo.total} + /{info.total}
: } diff --git a/packages/frontend/core/src/modules/pdf/entities/channel.ts b/packages/frontend/core/src/modules/pdf/entities/channel.ts deleted file mode 100644 index 97e9904abcbf8..0000000000000 --- a/packages/frontend/core/src/modules/pdf/entities/channel.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { DebugLogger } from '@affine/debug'; - -import type { - MessageData, - MessageDataMap, - MessageDataType, -} from '../workers/types'; -import { MessageOp } from '../workers/types'; - -const logger = new DebugLogger('affine:workspace:pdf:channel'); - -export class PDFChannel { - constructor( - public readonly id: string, - public readonly port: MessagePort - ) {} - - on(callback: (data: MessageDataMap[MessageOp.Rendered]) => void) { - this.port.addEventListener( - 'message', - ({ data }: MessageEvent) => { - const { type } = data; - if (type !== MessageOp.Rendered) return; - callback(data[type]); - } - ); - return this; - } - - start() { - this.port.start(); - logger.debug('opened', this.id); - } - - post( - type: T, - data?: MessageDataType[T], - transfers?: Transferable[] - ) { - const message = { type }; - if (data) { - Object.assign(message, { [type]: data }); - } - if (transfers?.length) { - this.port.postMessage(message, transfers); - return; - } - this.port.postMessage(message); - } - - dispose() { - this.post(MessageOp.ChannelClose, this.id); - this.port.close(); - logger.debug('closed', this.id); - } - - [Symbol.dispose]() { - this.dispose(); - } -} diff --git a/packages/frontend/core/src/modules/pdf/entities/pdf.ts b/packages/frontend/core/src/modules/pdf/entities/pdf.ts index 259ff420c7346..dc994db07159f 100644 --- a/packages/frontend/core/src/modules/pdf/entities/pdf.ts +++ b/packages/frontend/core/src/modules/pdf/entities/pdf.ts @@ -1,37 +1,25 @@ -import type { WorkspaceService } from '@toeverything/infra'; -import { Entity, ObjectPool } from '@toeverything/infra'; +import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; +import { Entity, LiveData } from '@toeverything/infra'; -import { PDFWorker } from './worker'; +import { createPdfClient } from '../workers/client'; +import { defaultDocInfo, type DocState, State } from '../workers/types'; -export class PDFEntity extends Entity { - workers = new ObjectPool({ - onDelete(worker) { - worker.dispose(); - }, - }); +export class Pdf extends Entity<{ id: string }> { + public readonly id: string = this.props.id; - constructor(private readonly workspaceService: WorkspaceService) { - super(); - } + public readonly info$ = new LiveData({ + state: State.IDLE, + ...defaultDocInfo(), + }); - get(id: string) { - let result = this.workers.get(id); - if (!result) { - const worker = new PDFWorker(id, this.name); - result = this.workers.put(id, worker); - } - return { worker: result.obj, release: result.release }; - } + public readonly client = createPdfClient(); - get name() { - return this.workspaceService.workspace.id; + open(model: AttachmentBlockModel) { + return this.client.open(model, info => this.info$.next(info)); } - override dispose(): void { - for (const worker of this.workers.objects.values()) { - worker.obj.dispose(); - } - this.workers.clear(); + override dispose() { + this.client.destroy(); super.dispose(); } } diff --git a/packages/frontend/core/src/modules/pdf/entities/pdfs.ts b/packages/frontend/core/src/modules/pdf/entities/pdfs.ts new file mode 100644 index 0000000000000..063602ec33f1b --- /dev/null +++ b/packages/frontend/core/src/modules/pdf/entities/pdfs.ts @@ -0,0 +1,41 @@ +import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; +import type { WorkspaceService } from '@toeverything/infra'; +import { Entity, ObjectPool } from '@toeverything/infra'; + +import { Pdf } from './pdf'; + +export class Pdfs extends Entity { + pdfs = new ObjectPool({ + onDelete: pdf => { + pdf.dispose(); + }, + }); + + constructor(private readonly workspaceService: WorkspaceService) { + super(); + } + + get(model: AttachmentBlockModel) { + const { id } = model; + + let result = this.pdfs.get(id); + + if (!result) { + const pdf = this.framework.createEntity(Pdf, { id }); + result = this.pdfs.put(id, pdf); + } + + const { obj: pdf, release } = result; + + return { pdf, release }; + } + + get name() { + return this.workspaceService.workspace.id; + } + + override dispose() { + this.pdfs.clear(); + super.dispose(); + } +} diff --git a/packages/frontend/core/src/modules/pdf/entities/worker.ts b/packages/frontend/core/src/modules/pdf/entities/worker.ts deleted file mode 100644 index fe76e87ce1fd7..0000000000000 --- a/packages/frontend/core/src/modules/pdf/entities/worker.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { DebugLogger } from '@affine/debug'; -import { LiveData } from '@toeverything/infra'; -import { nanoid } from 'nanoid'; - -import type { MessageData, MessageDataType } from '../workers/types'; -import { MessageOp, State } from '../workers/types'; -import { PDFChannel } from './channel'; - -const logger = new DebugLogger('affine:workspace:pdf:worker'); - -export class PDFWorker { - public readonly worker: Worker; - - public docInfo$ = new LiveData({ total: 0, width: 1, height: 1 }); - - constructor( - public readonly id: string, - public readonly name: string - ) { - const worker = new Worker( - /* webpackChunkName: "pdf.worker" */ new URL( - '../workers/worker.ts', - import.meta.url - ) - ); - - worker.addEventListener('message', (e: MessageEvent) => { - this.process(e).catch(console.error); - }); - - this.worker = worker; - logger.debug('created'); - } - - async process({ data }: MessageEvent) { - const { type } = data; - - // @ts-expect-error allow - if (type === State.Loaded) { - this.worker.dispatchEvent(new CustomEvent('ready')); - return; - } - - if (type === MessageOp.Opened) { - this.docInfo$.value = data[type]; - this.worker.dispatchEvent(new CustomEvent('opened')); - return; - } - - if (type === MessageOp.Rendered) { - this.worker.dispatchEvent( - new CustomEvent('rendered', { - detail: data[type], - }) - ); - } - } - - on(listeners: Record void>) { - const disposables: Disposable[] = []; - - for (const [type, listener] of Object.entries(listeners)) { - this.worker.addEventListener(type, listener); - - disposables.push({ - [Symbol.dispose]: () => { - this.worker.removeEventListener(type, listener); - }, - }); - } - return { - [Symbol.dispose]: () => { - disposables.forEach(disposable => disposable[Symbol.dispose]()); - }, - }; - } - - // Creates a channel. - channel(id = nanoid()) { - const { port1, port2 } = new MessageChannel(); - this.post(MessageOp.ChannelOpen, id, [port2]); - return new PDFChannel(id, port1); - } - - open(buffer: ArrayBuffer) { - this.post(MessageOp.Open, buffer, [buffer]); - } - - post( - type: T, - data?: MessageDataType[T], - transfers?: Transferable[] - ) { - const message = { type }; - if (data) { - Object.assign(message, { [type]: data }); - } - if (transfers?.length) { - this.worker.postMessage(message, transfers); - return; - } - this.worker.postMessage(message); - } - - dispose() { - this.worker.terminate(); - logger.debug('closed'); - } - - [Symbol.dispose]() { - this.dispose(); - } -} diff --git a/packages/frontend/core/src/modules/pdf/index.ts b/packages/frontend/core/src/modules/pdf/index.ts index 89c97930bd263..d5b1596b11fbe 100644 --- a/packages/frontend/core/src/modules/pdf/index.ts +++ b/packages/frontend/core/src/modules/pdf/index.ts @@ -1,16 +1,18 @@ import type { Framework } from '@toeverything/infra'; import { WorkspaceScope, WorkspaceService } from '@toeverything/infra'; -import { PDFEntity } from './entities/pdf'; -import { PDFService } from './services/pdf'; +import { Pdf } from './entities/pdf'; +import { Pdfs } from './entities/pdfs'; +import { PdfsService } from './services/pdfs'; export function configurePDFModule(framework: Framework) { framework .scope(WorkspaceScope) - .service(PDFService) - .entity(PDFEntity, [WorkspaceService]); + .service(PdfsService) + .entity(Pdfs, [WorkspaceService]) + .entity(Pdf); } -export { PDFChannel } from './entities/channel'; -export { PDFWorker } from './entities/worker'; -export { PDFService } from './services/pdf'; +export { Pdf } from './entities/pdf'; +export { PdfsService } from './services/pdfs'; +export { PdfClient, type PdfSender } from './workers/client'; diff --git a/packages/frontend/core/src/modules/pdf/services/pdf.ts b/packages/frontend/core/src/modules/pdf/services/pdf.ts deleted file mode 100644 index 115403a863627..0000000000000 --- a/packages/frontend/core/src/modules/pdf/services/pdf.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { Service } from '@toeverything/infra'; - -import { PDFEntity } from '../entities/pdf'; - -export class PDFService extends Service { - pdf = this.framework.createEntity(PDFEntity); - - get(id: string) { - return this.pdf.get(id); - } - - override dispose(): void { - this.pdf.dispose(); - super.dispose(); - } -} diff --git a/packages/frontend/core/src/modules/pdf/services/pdfs.ts b/packages/frontend/core/src/modules/pdf/services/pdfs.ts new file mode 100644 index 0000000000000..bcc421b4938df --- /dev/null +++ b/packages/frontend/core/src/modules/pdf/services/pdfs.ts @@ -0,0 +1,20 @@ +import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; +import { Service } from '@toeverything/infra'; + +import { Pdfs } from '../entities/pdfs'; + +// One PDF document one worker. +// Multiple channels correspond to multiple views. + +export class PdfsService extends Service { + pdfs = this.framework.createEntity(Pdfs); + + get(model: AttachmentBlockModel) { + return this.pdfs.get(model); + } + + override dispose() { + this.pdfs.dispose(); + super.dispose(); + } +} diff --git a/packages/frontend/core/src/modules/pdf/workers/client.ts b/packages/frontend/core/src/modules/pdf/workers/client.ts new file mode 100644 index 0000000000000..5e857a7580ea5 --- /dev/null +++ b/packages/frontend/core/src/modules/pdf/workers/client.ts @@ -0,0 +1,130 @@ +import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; +import { fromPromise, ObjectPool } from '@toeverything/infra'; +import { OpClient, transfer } from '@toeverything/infra/op'; +import { nanoid } from 'nanoid'; +import { Observable, type Observer } from 'rxjs'; + +import type { ChannelOps, ClientOps } from './ops'; +import { type DocState, State } from './types'; +import { downloadBlobToBuffer } from './utils'; + +export function createPdfClient() { + const worker = new Worker( + /* webpackChunkName: "pdf.worker" */ new URL('./worker.ts', import.meta.url) + ); + + const client = new PdfClient(worker); + client.listen(); + return client; +} + +export type PdfSender = OpClient; + +export class PdfClient extends OpClient { + channels = new ObjectPool({ + onDelete(client) { + client.destroy(); + }, + }); + + private _ping(id: string, subscriber: Observer) { + return this.subscribe('pingpong', { id }, subscriber); + } + + private _open( + id: string, + buffer: ArrayBuffer, + subscriber: Observer + ) { + return this.subscribe( + 'open', + transfer({ id, buffer }, [buffer]), + subscriber + ); + } + + private _downloadBlobToBuffer( + model: AttachmentBlockModel, + subscriber: Partial> + ) { + return fromPromise(downloadBlobToBuffer(model)).subscribe(subscriber); + } + + // Opens a PDF document. + open(model: AttachmentBlockModel, update?: (info: DocState) => void) { + const { id } = model; + const ob$ = new Observable(subscriber => { + const setInfo = (info: DocState) => { + update?.(info); + subscriber.next(info); + }; + const error = (err?: any) => subscriber.error(err); + const complete = () => subscriber.complete(); + + this._ping(id, { + next: info => { + setInfo(info); + + if (info.state === State.Opened) { + complete(); + return; + } + + if (info.state === State.Opening) { + return; + } + + if (info.state === State.Loaded) { + info.state = State.Opening; + setInfo(info); + + this._downloadBlobToBuffer(model, { + next: buffer => + this._open(id, buffer, { + next: info => setInfo(info), + error, + complete, + }), + error: err => subscriber.error(err), + }); + } + }, + error, + complete, + }); + }); + + return ob$; + } + + // Creates a channel. + channel(id = nanoid()) { + let result = this.channels.get(id); + + if (!result) { + const { port1, port2: port } = new MessageChannel(); + const sender = new OpClient(port1); + + this.call('channel', transfer({ id, port }, [port])).catch(err => { + console.error(err); + }); + + result = this.channels.put(id, sender); + + sender.listen(); + } + + const { obj: sender, release } = result; + + return { sender, release }; + } + + override destroy() { + this.channels.clear(); + super.destroy(); + } + + [Symbol.dispose]() { + this.destroy(); + } +} diff --git a/packages/frontend/core/src/modules/pdf/workers/ops.ts b/packages/frontend/core/src/modules/pdf/workers/ops.ts new file mode 100644 index 0000000000000..5ddd446cc6624 --- /dev/null +++ b/packages/frontend/core/src/modules/pdf/workers/ops.ts @@ -0,0 +1,24 @@ +import type { OpSchema } from '@toeverything/infra/op'; + +import type { DocState, RenderKind, RenderOut } from './types'; + +export interface ClientOps extends OpSchema { + // Ping-Pong + pingpong: [{ id: string }, DocState]; + // Opens a PDF document + open: [{ id: string; buffer: ArrayBuffer }, DocState]; + // Creates a channel + channel: [{ id: string; port: MessagePort }, boolean]; +} + +export interface ChannelOps extends OpSchema { + // Renders image data by page index + render: [ + { + seq: number[]; + kind: RenderKind; + scale?: number; + }, + RenderOut | void, + ]; +} diff --git a/packages/frontend/core/src/modules/pdf/workers/types.ts b/packages/frontend/core/src/modules/pdf/workers/types.ts index df0134dceadd2..92392361a91dd 100644 --- a/packages/frontend/core/src/modules/pdf/workers/types.ts +++ b/packages/frontend/core/src/modules/pdf/workers/types.ts @@ -1,8 +1,9 @@ export enum State { IDLE = 0, Loading, - Loaded, - Failed, + Loaded, // WASM has been loaded and initialized. + Opening, + Opened, // A document has been opened. } export type DocInfo = { @@ -11,54 +12,21 @@ export type DocInfo = { height: number; }; -export type ViewportInfo = { - // TODO(@fundon): zoom & scale - dpi: number; - width: number; - height: number; -}; - -export enum MessageOp { - Open = State.Failed + 1, - Opened, - Render, - Rendered, - ChannelOpen, - ChannelClose, -} - export enum RenderKind { Page, Thumbnail, } -export interface MessageDataMap { - [State.IDLE]: undefined; - [State.Loading]: undefined; - [State.Loaded]: undefined; - [State.Failed]: undefined; - [MessageOp.Open]: ArrayBuffer; - [MessageOp.Opened]: DocInfo; - [MessageOp.Render]: { - index: number; - kind: RenderKind; - scale?: number; - }; - [MessageOp.Rendered]: { - index: number; - width: number; - height: number; - kind: RenderKind; - buffer: Uint8ClampedArray; - }; - [MessageOp.ChannelOpen]: string; - [MessageOp.ChannelClose]: string; -} - -export type MessageDataType = { - [P in keyof T]: T[P]; +export type RenderOut = { + index: number; + width: number; + height: number; + kind: RenderKind; + buffer: Uint8ClampedArray; }; -export type MessageData = { - type: T; -} & P; +export type DocState = { state: State } & DocInfo; + +export function defaultDocInfo(total = 1, width = 1, height = 1) { + return { total, width, height }; +} diff --git a/packages/frontend/core/src/modules/pdf/workers/utils.ts b/packages/frontend/core/src/modules/pdf/workers/utils.ts index b4569b13d97ce..40b612bf1e163 100644 --- a/packages/frontend/core/src/modules/pdf/workers/utils.ts +++ b/packages/frontend/core/src/modules/pdf/workers/utils.ts @@ -1,5 +1,20 @@ +import type { AttachmentBlockModel } from '@blocksuite/affine/blocks'; import type { Document, Viewer } from '@toeverything/pdf-viewer'; +export async function downloadBlobToBuffer(model: AttachmentBlockModel) { + const sourceId = model.sourceId; + if (!sourceId) { + throw new Error('Attachment not found'); + } + + const blob = await model.doc.blobSync.get(sourceId); + if (!blob) { + throw new Error('Attachment not found'); + } + + return await blob.arrayBuffer(); +} + export function resizeImageBitmap( imageData: ImageData, options: { diff --git a/packages/frontend/core/src/modules/pdf/workers/worker.ts b/packages/frontend/core/src/modules/pdf/workers/worker.ts index c3c62fee6964e..e7f95656cdfa1 100644 --- a/packages/frontend/core/src/modules/pdf/workers/worker.ts +++ b/packages/frontend/core/src/modules/pdf/workers/worker.ts @@ -1,3 +1,4 @@ +import { OpConsumer, transfer } from '@toeverything/infra/op'; import type { Document } from '@toeverything/pdf-viewer'; import { createPDFium, @@ -5,184 +6,120 @@ import { Runtime, Viewer, } from '@toeverything/pdf-viewer'; +import { BehaviorSubject, filter, from, map, switchMap, take } from 'rxjs'; -import type { MessageData, MessageDataType } from './types'; -import { MessageOp, State } from './types'; +import type { ChannelOps, ClientOps } from './ops'; +import type { DocInfo } from './types'; +import { defaultDocInfo, State } from './types'; import { renderToUint8ClampedArray } from './utils'; -let state = State.IDLE; let viewer: Viewer | null = null; let doc: Document | undefined = undefined; -const docInfo = { total: 0, width: 1, height: 1 }; -const flags = PageRenderingflags.REVERSE_BYTE_ORDER | PageRenderingflags.ANNOT; -const channels = new Set(); +const info: DocInfo = defaultDocInfo(); +const state$ = new BehaviorSubject(State.IDLE); +const FLAGS = PageRenderingflags.REVERSE_BYTE_ORDER | PageRenderingflags.ANNOT; -// Waits for wasm to load and initialize. -async function start() { - if (state !== State.IDLE) return; +// Pipes +const statePipe$ = state$.pipe(map(state => ({ state, ...info }))); - waitForReady({ id: 0 }); +state$.next(State.Loading); - state = State.Loading; +createPDFium() + .then(pdfium => { + viewer = new Viewer(new Runtime(pdfium)); + state$.next(State.Loaded); + }) + .catch(err => { + state$.error(err); + }); - const pdfium = await createPDFium(); - viewer = new Viewer(new Runtime(pdfium)); +// Multiple channels can be processed in a worker. - state = State.Loaded; -} +// @ts-expect-error fixme +const consumer = new OpConsumer(self); -function post( - sender: typeof globalThis | MessagePort, - type: T, - data?: MessageDataType[T], - transfers?: Transferable[] -) { - const message = { type }; - if (data) { - Object.assign(message, { [type]: data }); - } - if (transfers?.length) { - if (sender instanceof MessagePort) { - sender.postMessage(message, transfers); - return; - } - sender.postMessage(message, '*', transfers); - return; - } - sender.postMessage(message); -} - -function waitForReady(tick: { id: number }) { - post(self, state); - if (state === State.Loaded || state === State.Failed) { - if (tick.id) { - clearTimeout(tick.id); - tick.id = 0; - } - return; +consumer.register('pingpong', () => { + return statePipe$; +}); + +consumer.register('open', ({ id: _, buffer }) => { + if (!viewer) { + return statePipe$; } - // @ts-expect-error allow - tick.id = setTimeout(waitForReady, 55, tick); -} - -function rendering( - sender: typeof globalThis | MessagePort, - viewer: Viewer, - doc: Document, - data: MessageDataType[MessageOp.Render] -) { - const { index, kind, scale = 1 } = data; - - if (index < 0 || index >= docInfo.total) return; - - const width = Math.ceil(docInfo.width * scale); - const height = Math.ceil(docInfo.height * scale); - const buffer = renderToUint8ClampedArray( - viewer, - doc, - flags, - index, - width, - height - ); - if (!buffer) return; - - post(sender, MessageOp.Rendered, { index, kind, width, height, buffer }, [ - buffer.buffer, - ]); -} - -function process({ data, ports: [port] }: MessageEvent) { - const { type } = data; - - switch (type) { - case MessageOp.Open: { - if (!viewer) return; - - const buffer = data[type]; - if (!buffer) return; - - // release loaded document - if (doc) { - doc.close(); - } - - doc = viewer.open(new Uint8Array(buffer)); - - if (!doc) return; - - const page = doc.page(0); - - if (!page) return; - - Object.assign(docInfo, { - total: doc.pageCount(), - height: Math.ceil(page.height()), - width: Math.ceil(page.width()), - }); - page.close(); - - post(self, MessageOp.Opened, docInfo); - - break; - } - - case MessageOp.Render: { - queueMicrotask(() => { + + return state$ + .pipe( + take(1), + filter(s => s === State.Loaded) + ) + .pipe( + switchMap(() => { + if (doc) { + doc?.close(); + } + + state$.next(State.Opening); + + doc = viewer?.open(new Uint8Array(buffer)); + + if (!doc) { + Object.assign(info, defaultDocInfo()); + state$.next(State.Loaded); + return statePipe$; + } + + const page = doc.page(0); + if (!page) { + doc.close(); + Object.assign(info, defaultDocInfo()); + state$.next(State.Loaded); + return statePipe$; + } + + const rect = page.size(); + page.close(); + + const total = doc.pageCount(); + + Object.assign(info, { total, ...rect }); + state$.next(State.Opened); + return statePipe$; + }) + ); +}); + +consumer.register('channel', ({ id: _, port }) => { + const receiver = new OpConsumer(port); + + receiver.register('render', ({ seq, kind, scale = 1 }) => { + if (!viewer || !doc) return from([]).pipe(); + + const width = Math.ceil(info.width * scale); + const height = Math.ceil(info.height * scale); + + return from(seq).pipe( + map(index => { if (!viewer || !doc) return; - rendering(self, viewer, doc, data[type]); - }); - - break; - } - - // process only images - case MessageOp.ChannelOpen: { - const id = data[type]; - if (id && port) { - port.addEventListener( - 'message', - ({ data }: MessageEvent) => { - const { type } = data; - - if (type === MessageOp.ChannelClose) { - port.close(); - channels.delete(port); - return; - } - - if (type !== MessageOp.Render) return; - - queueMicrotask(() => { - if (!viewer || !doc) return; - rendering(port, viewer, doc, data[type]); - }); - } - ); - port.start(); - } - break; - } - } -} + const buffer = renderToUint8ClampedArray( + viewer, + doc, + FLAGS, + index, + width, + height + ); + if (!buffer) return; -self.addEventListener('message', process); + return transfer({ index, kind, width, height, buffer }, [ + buffer.buffer, + ]); + }) + ); + }); -start().catch(err => { - if (channels.size > 0) { - for (const channel of channels) { - channel.close(); - } - channels.clear(); - } - if (doc) { - doc.close(); - doc = undefined; - } - if (viewer) { - viewer.close(); - viewer = null; - } - console.error(err); + receiver.listen(); + return true; }); + +consumer.listen();