Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets Cloudflare Pub/Sub & Durable Objects Support #2436

Open
1 task
cliqer opened this issue May 12, 2024 · 1 comment
Open
1 task

WebSockets Cloudflare Pub/Sub & Durable Objects Support #2436

cliqer opened this issue May 12, 2024 · 1 comment

Comments

@cliqer
Copy link

cliqer commented May 12, 2024

Describe the feature

I have extended the cloudflare adapter with pub/sub but can't make it work when deployed.
Any pointers on how to fix this or are there other issues with cf websockets?

In wrangler I added the durable object and ran it with the cloudflare-module preset:

# wrangler.toml
[durable_objects]
bindings = [
    { name = "TOPIC_DO", class_name = "default" }
]
// adapters/cloudflare.ts

import type * as _cf from "@cloudflare/workers-types";

import { Peer } from "../peer";
import { AdapterOptions, defineWebSocketAdapter } from "../types.js";
import { Message } from "../message";
import { WSError } from "../error";
import { createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

type Env = Record<string, any>;

declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;

export interface CloudflareAdapter {
    handleUpgrade(
        req: _cf.Request,
        env: Env,
        context: _cf.ExecutionContext,
    ): Promise<_cf.Response>;
}

export interface CloudflareOptions extends AdapterOptions {}

const topics = new Map<string, Set<_cf.WebSocket>>();

export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>((options = {}) => {
    const crossws = createCrossWS(options);

    const handleUpgrade = async (req: _cf.Request, env: Env, context: _cf.ExecutionContext) => {
        const pair = new WebSocketPair();
        const client = pair[0];
        const server = pair[1];

        const peer = new CloudflarePeer({
            cloudflare: { client, server, req, env, context },
        });

        const { headers } = await crossws.upgrade(peer);

        server.accept();
        crossws.$callHook("cloudflare:accept", peer);
        crossws.callHook("open", peer);

        server.addEventListener("message", (event) => {
            const data = JSON.parse(<string>event.data);
            if (data.action === 'publish') {
                publish(data.topic, data.message);
            } else if (data.action === 'subscribe') {
                subscribe(server, data.topic);
            } else if (data.action === 'unsubscribe') {
                unsubscribe(server, data.topic);
            }
            crossws.$callHook("cloudflare:message", peer, event);
            crossws.callHook("message", peer, new Message(event.data));
        });

        server.addEventListener("error", (event) => {
            crossws.$callHook("cloudflare:error", peer, event);
            crossws.callHook("error", peer, new WSError(event.error));
        });

        server.addEventListener("close", (event) => {
            topics.forEach((subscribers, topic) => unsubscribe(server, topic));
            crossws.$callHook("cloudflare:close", peer, event);
            crossws.callHook("close", peer, {
                code: event.code,
                reason: event.reason,
            });
        });

        return new Response(null, {
            status: 101,
            webSocket: client,
            headers,
        });
    };

    return { handleUpgrade };
});

class CloudflarePeer extends Peer<{
    cloudflare: {
        client: _cf.WebSocket;
        server: _cf.WebSocket;
        req: _cf.Request;
        env: Env;
        context: _cf.ExecutionContext;
    };
}> {
    get addr() {
        return undefined;
    }

    get url() {
        return this.ctx.cloudflare.req.url;
    }

    get headers() {
        return this.ctx.cloudflare.req.headers as Headers;
    }

    get readyState() {
        return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
    }

    send(message: any) {
        this.ctx.cloudflare.server.send(toBufferLike(message));
        return 0;
    }

    subscribe(topic: string): void {
        const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
        const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
        topicDO.fetch('/subscribe', { method: 'POST', body: this.ctx.cloudflare.req.cf?.requestId });
    }

    unsubscribe(topic: string): void {
        const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
        const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
        topicDO.fetch('/unsubscribe', { method: 'POST', body: this.ctx.cloudflare.req.cf?.requestId });
    }

    publish(topic: string, message: any, options?: { compress?: boolean }): void {
        const topicId = this.ctx.cloudflare.env.TOPIC_DO.idFromName(topic);
        const topicDO = this.ctx.cloudflare.env.TOPIC_DO.get(topicId);
        topicDO.fetch('/publish', { method: 'POST', body: JSON.stringify({ topic, message }) });
    }
}

function publish(topic: string, message: string) {
    const subscribers = topics.get(topic);
    if (subscribers) {
        subscribers.forEach(subscriber => {
            subscriber.send(JSON.stringify({ topic, message }));
        });
    }
}

function subscribe(ws: _cf.WebSocket, topic: string) {
    let subscribers = topics.get(topic);
    if (!subscribers) {
        subscribers = new Set();
        topics.set(topic, subscribers);
    }
    subscribers.add(ws);
}

function unsubscribe(ws: _cf.WebSocket, topic: string) {
    const subscribers = topics.get(topic);
    if (subscribers) {
        subscribers.delete(ws);
        if (subscribers.size === 0) {
            topics.delete(topic);
        }
    }
}

Additional information

  • Would you be willing to help implement this feature?
@TheAlexLichter
Copy link
Member

Progress via #2801

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants