From d2e2ec8716ece2cea1f9a335cc7ca62e0a149149 Mon Sep 17 00:00:00 2001 From: Rahul Sethi <5822355+RamIdeas@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:01:40 +0100 Subject: [PATCH] wip: multi-worker wrangler dev --- packages/wrangler/src/api/dev.ts | 3 +- .../startDevWorker/LocalRuntimeController.ts | 10 ++- .../src/api/startDevWorker/ProxyController.ts | 22 +++-- .../wrangler/src/api/startDevWorker/index.ts | 82 +++++++++++++++++++ packages/wrangler/src/dev.tsx | 57 +++++++++---- packages/wrangler/src/dev/hotkeys.ts | 19 +++-- packages/wrangler/src/dev/local.tsx | 39 +++++++-- .../templates/startDevWorker/ProxyWorker.ts | 7 +- 8 files changed, 192 insertions(+), 47 deletions(-) diff --git a/packages/wrangler/src/api/dev.ts b/packages/wrangler/src/api/dev.ts index ecf5dcc506e3..d7d580fffa59 100644 --- a/packages/wrangler/src/api/dev.ts +++ b/packages/wrangler/src/api/dev.ts @@ -176,7 +176,8 @@ export async function unstable_dev( onReady: (address, port, proxyData) => { readyResolve({ address, port, proxyData }); }, - config: options?.config, + // @ts-expect-error who cares + config: options?.config === undefined ? undefined : [options.config], env: options?.env, processEntrypoint, additionalModules, diff --git a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts index be72911bbb70..5aa56087b20b 100644 --- a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts +++ b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts @@ -6,6 +6,7 @@ import * as MF from "../../dev/miniflare"; import { logger } from "../../logger"; import { RuntimeController } from "./BaseController"; import { castErrorCause } from "./events"; +import { ProxyControllerLogger } from "./ProxyController"; import { convertBindingsToCfWorkerInitBindings } from "./utils"; import type { WorkerEntrypointsDefinition } from "../../dev-registry"; import type { @@ -146,12 +147,19 @@ export class LocalRuntimeController extends RuntimeController { async #onBundleComplete(data: BundleCompleteEvent, id: number) { try { - const { options, internalObjects, entrypointNames } = + // eslint-disable-next-line prefer-const + let { options, internalObjects, entrypointNames } = await MF.buildMiniflareOptions( this.#log, await convertToConfigBundle(data), this.#proxyToUserWorkerAuthenticationSecret ); + options = { + ...options, + log: new ProxyControllerLogger(MF.castLogLevel(logger.loggerLevel), { + prefix: `wrangler-${data.config.name}`, + }), + }; options.liveReload = false; // TODO: set in buildMiniflareOptions once old code path is removed if (this.#mf === undefined) { logger.log(chalk.dim("⎔ Starting local server...")); diff --git a/packages/wrangler/src/api/startDevWorker/ProxyController.ts b/packages/wrangler/src/api/startDevWorker/ProxyController.ts index 8243afb33954..4b4fed27d558 100644 --- a/packages/wrangler/src/api/startDevWorker/ProxyController.ts +++ b/packages/wrangler/src/api/startDevWorker/ProxyController.ts @@ -3,6 +3,7 @@ import { randomUUID } from "node:crypto"; import events from "node:events"; import path from "node:path"; import { LogLevel, Miniflare, Mutex, Response } from "miniflare"; +import { fetch } from "undici"; import inspectorProxyWorkerPath from "worker:startDevWorker/InspectorProxyWorker"; import proxyWorkerPath from "worker:startDevWorker/ProxyWorker"; import WebSocket from "ws"; @@ -106,6 +107,7 @@ export class ProxyController extends Controller { }, bindings: { PROXY_CONTROLLER_AUTH_SECRET: this.secret, + NAME: this.latestConfig.name ?? "", }, // no need to use file-system, so don't @@ -158,7 +160,7 @@ export class ProxyController extends Controller { log: new ProxyControllerLogger(castLogLevel(logger.loggerLevel), { prefix: // if debugging, log requests with specic ProxyWorker prefix - logger.loggerLevel === "debug" ? "wrangler-ProxyWorker" : "wrangler", + `wrangler-ProxyWorker-${this.latestConfig.name}`, }), handleRuntimeStdio, liveReload: false, @@ -286,20 +288,22 @@ export class ProxyController extends Controller { try { await this.runtimeMessageMutex.runWith(async () => { - const { proxyWorker } = await this.ready.promise; + const { proxyWorker, url } = await this.ready.promise; const ready = await proxyWorker.ready.catch(() => undefined); if (!ready) { return; } - return proxyWorker.dispatchFetch( - `http://dummy/cdn-cgi/ProxyWorker/${message.type}`, - { - headers: { Authorization: this.secret }, - cf: { hostMetadata: message }, - } + const proxyWorkerUrl = new URL( + `/cdn-cgi/ProxyWorker/${message.type}`, + url ); + return fetch(proxyWorkerUrl, { + method: "POST", + headers: { Authorization: this.secret }, + body: JSON.stringify(message), + }); }); } catch (cause) { if (this._torndown) { @@ -317,7 +321,7 @@ export class ProxyController extends Controller { error ); - throw error; + // throw error; } } async sendMessageToInspectorProxyWorker( diff --git a/packages/wrangler/src/api/startDevWorker/index.ts b/packages/wrangler/src/api/startDevWorker/index.ts index 7b968a72f619..e5782dc60908 100644 --- a/packages/wrangler/src/api/startDevWorker/index.ts +++ b/packages/wrangler/src/api/startDevWorker/index.ts @@ -1,4 +1,8 @@ +import assert from "node:assert"; +import { updateDevEnvRegistry } from "../../dev"; +import { serializeWorkerRegistryDefinition } from "../../dev/local"; import { DevEnv } from "./DevEnv"; +import type { WorkerDefinition } from "../../dev-registry"; import type { StartDevWorkerInput, Worker } from "./types"; export { DevEnv }; @@ -12,3 +16,81 @@ export async function startWorker( return devEnv.startWorker(options); } + +export async function startMultiWorker( + optionsArray: StartDevWorkerInput[], + devEnv0: DevEnv +): Promise { + const workerRegistry = new Map(); + let prevRegistry: Record = {}; + async function updateWorkerRegistry( + name: string, + definition: WorkerDefinition + ) { + workerRegistry.set(name, definition); + + if (!devEnvs) { + return; + } + + const nextRegistry = Object.fromEntries(workerRegistry); + + if (JSON.stringify(prevRegistry) !== JSON.stringify(nextRegistry)) { + prevRegistry = nextRegistry; + await Promise.all( + devEnvs.map(async (devEnv) => { + await updateDevEnvRegistry( + devEnv, + Object.fromEntries(workerRegistry) + ); + }) + ); + } + } + + const devEnvs = await Promise.all( + optionsArray.map(async (options, workerIndex) => { + const devEnv = workerIndex === 0 ? devEnv0 : new DevEnv(); + + devEnv.runtimes.forEach((runtime) => { + runtime.on("reloadComplete", async (reloadEvent) => { + if (!reloadEvent.config.dev?.remote) { + const { url } = await devEnv.proxy.ready.promise; + const { name } = reloadEvent.config; + assert(name); // default value "multi-worker-n" is defined below + + const definition = serializeWorkerRegistryDefinition( + url, + name, + reloadEvent.proxyData.internalDurableObjects, + reloadEvent.proxyData.entrypointAddresses + ); + + if (definition) { + await updateWorkerRegistry(name, definition); + } + } + }); + }); + + await devEnv.config.set({ + // name: `multi-worker-${workerIndex + 1}`, + ...options, + dev: { + ...options.dev, + remote: false, + inspector: { port: 0 }, + server: { + ...options.dev?.server, + // port: options.dev?.server?.port ?? 0, + // hostname: "localhost", + }, + }, + }); + + return devEnv; + }) + ); + + return devEnvs; +} diff --git a/packages/wrangler/src/dev.tsx b/packages/wrangler/src/dev.tsx index 53c98cb7482f..3b37b5e74da9 100644 --- a/packages/wrangler/src/dev.tsx +++ b/packages/wrangler/src/dev.tsx @@ -5,7 +5,7 @@ import util from "node:util"; import { isWebContainer } from "@webcontainer/env"; import { watch } from "chokidar"; import { render } from "ink"; -import { DevEnv } from "./api"; +import { DevEnv, startMultiWorker } from "./api"; import { convertCfWorkerInitBindingstoBindings, extractBindingsOfType, @@ -64,6 +64,7 @@ import type { } from "./config/environment"; import type { CfModule, CfWorkerInit } from "./deployment-bundle/worker"; import type { WorkerRegistry } from "./dev-registry"; +import type { ExperimentalAssetsOptions } from "./experimental-assets"; import type { LoggerLevel } from "./logger"; import type { EnablePagesAssetsServiceBindingOptions } from "./miniflare-cli/types"; import type { @@ -76,6 +77,14 @@ import type React from "react"; export function devOptions(yargs: CommonYargsArgv) { return ( yargs + .option("config", { + alias: "c", + describe: + "Path to .toml configuration file. Set mutliple times for multi-worker support.", + type: "string", + array: true, + requiresArg: true, + }) .positional("script", { describe: "The path to an entry point for your worker", type: "string", @@ -398,10 +407,16 @@ This is currently not supported 😭, but we think that we'll get it to work soo () => startDev(args) ); if (args.experimentalDevEnv) { - assert(devInstance instanceof DevEnv); - await events.once(devInstance, "teardown"); + if (devInstance instanceof DevEnv) { + await events.once(devInstance, "teardown"); + } else if (Array.isArray(devInstance)) { + await Promise.all( + devInstance.map((worker) => events.once(worker, "teardown")) + ); + } } else { assert(!(devInstance instanceof DevEnv)); + assert(!Array.isArray(devInstance)); configFileWatcher = devInstance.configFileWatcher; assetsWatcher = devInstance.assetsWatcher; @@ -467,7 +482,7 @@ export type StartDevOptions = DevArguments & onReady?: (ip: string, port: number, proxyData: ProxyData) => void; }; -async function updateDevEnvRegistry( +export async function updateDevEnvRegistry( devEnv: DevEnv, registry: WorkerRegistry | undefined ) { @@ -547,7 +562,7 @@ export async function startDev(args: StartDevOptions) { let rerender: (node: React.ReactNode) => void | undefined; try { const configPath = - args.config || + args.config?.[0] || (args.script && findWranglerToml(path.dirname(args.script))); let config = readConfig(configPath, args); @@ -654,13 +669,7 @@ export async function startDev(args: StartDevOptions) { ); }); } - - let unregisterHotKeys: () => void; - if (isInteractive() && args.showInteractiveDevSession !== false) { - unregisterHotKeys = registerDevHotKeys(devEnv, args); - } - - await devEnv.config.set({ + const startWorkerInput: StartDevWorkerInput = { name: args.name, config: configPath, entrypoint: args.script, @@ -730,7 +739,7 @@ export async function startDev(args: StartDevOptions) { if (!accountId) { unregisterHotKeys?.(); accountId = await requireAuth({}); - unregisterHotKeys = registerDevHotKeys(devEnv, args); + unregisterHotKeys = registerDevHotKeys(devEnvs, args); } return { @@ -790,7 +799,24 @@ export async function startDev(args: StartDevOptions) { // otherwise config at startup ends up overriding future config changes in the // ConfigController assets: args.assets ? assetsOptions : undefined, - } satisfies StartDevWorkerInput); + }; + + assert(args.config); + logger.log(args.config); + const options = await Promise.all( + args.config.map((configPath1, workerIndex) => + workerIndex === 0 ? startWorkerInput : { config: configPath1 } + ) + ); + + const devEnvs = await startMultiWorker(options, devEnv); + args.disableDevRegistry = args.config.length === 1; + args.forceLocal ||= args.config.length > 1; + + let unregisterHotKeys = () => {}; + if (isInteractive() && args.showInteractiveDevSession !== false) { + unregisterHotKeys = registerDevHotKeys(devEnvs, args); + } void metrics.sendMetricsEvent( "run dev", @@ -1031,7 +1057,8 @@ export async function startApiDev(args: StartDevOptions) { } const configPath = - args.config || (args.script && findWranglerToml(path.dirname(args.script))); + args.config?.[0] || + (args.script && findWranglerToml(path.dirname(args.script))); const projectRoot = configPath && path.dirname(configPath); const config = readConfig(configPath, args); diff --git a/packages/wrangler/src/dev/hotkeys.ts b/packages/wrangler/src/dev/hotkeys.ts index c453966e43ce..5b7c50f7bc7e 100644 --- a/packages/wrangler/src/dev/hotkeys.ts +++ b/packages/wrangler/src/dev/hotkeys.ts @@ -5,15 +5,16 @@ import { openInspector } from "./inspect"; import type { DevEnv } from "../api"; export default function registerDevHotKeys( - devEnv: DevEnv, + devEnvs: DevEnv[], args: { forceLocal?: boolean } ) { + const primaryDevEnv = devEnvs[0]; const unregisterHotKeys = registerHotKeys([ { keys: ["b"], label: "open a browser", handler: async () => { - const { url } = await devEnv.proxy.ready.promise; + const { url } = await primaryDevEnv.proxy.ready.promise; await openInBrowser(url.href); }, }, @@ -21,12 +22,12 @@ export default function registerDevHotKeys( keys: ["d"], label: "open devtools", handler: async () => { - const { inspectorUrl } = await devEnv.proxy.ready.promise; + const { inspectorUrl } = await primaryDevEnv.proxy.ready.promise; // TODO: refactor this function to accept a whole URL (not just .port and assuming .hostname) await openInspector( parseInt(inspectorUrl.port), - devEnv.config.latestConfig?.name + primaryDevEnv.config.latestConfig?.name ); }, }, @@ -34,12 +35,12 @@ export default function registerDevHotKeys( keys: ["l"], disabled: () => args.forceLocal ?? false, label: () => - `turn ${devEnv.config.latestConfig?.dev?.remote ? "on" : "off"} local mode`, + `turn ${primaryDevEnv.config.latestConfig?.dev?.remote ? "on" : "off"} local mode`, handler: async () => { - await devEnv.config.patch({ + await primaryDevEnv.config.patch({ dev: { - ...devEnv.config.latestConfig?.dev, - remote: !devEnv.config.latestConfig?.dev?.remote, + ...primaryDevEnv.config.latestConfig?.dev, + remote: !primaryDevEnv.config.latestConfig?.dev?.remote, }, }); }, @@ -56,7 +57,7 @@ export default function registerDevHotKeys( label: "to exit", handler: async () => { unregisterHotKeys(); - await devEnv.teardown(); + await Promise.allSettled(devEnvs.map((devEnv) => devEnv.teardown())); }, }, ]); diff --git a/packages/wrangler/src/dev/local.tsx b/packages/wrangler/src/dev/local.tsx index 60a9a58410cf..7a55820e2e1b 100644 --- a/packages/wrangler/src/dev/local.tsx +++ b/packages/wrangler/src/dev/local.tsx @@ -14,6 +14,7 @@ import type { CfWorkerInit, } from "../deployment-bundle/worker"; import type { + WorkerDefinition, WorkerEntrypointsDefinition, WorkerRegistry, } from "../dev-registry"; @@ -113,16 +114,12 @@ export async function localPropsToConfigBundle( }; } -export function maybeRegisterLocalWorker( +export function serializeWorkerRegistryDefinition( url: URL, - name: string | undefined, + name: string, internalDurableObjects: CfDurableObject[] | undefined, entrypointAddresses: WorkerEntrypointsDefinition | undefined -) { - if (name === undefined) { - return; - } - +): WorkerDefinition | undefined { let protocol = url.protocol; protocol = protocol.substring(0, url.protocol.length - 1); if (protocol !== "http" && protocol !== "https") { @@ -130,7 +127,7 @@ export function maybeRegisterLocalWorker( } const port = parseInt(url.port); - return registerWorker(name, { + return { protocol, mode: "local", port, @@ -142,7 +139,31 @@ export function maybeRegisterLocalWorker( durableObjectsHost: url.hostname, durableObjectsPort: port, entrypointAddresses: entrypointAddresses, - }); + }; +} + +export function maybeRegisterLocalWorker( + url: URL, + name: string | undefined, + internalDurableObjects: CfDurableObject[] | undefined, + entrypointAddresses: WorkerEntrypointsDefinition | undefined +) { + if (name === undefined) { + return; + } + + const definition = serializeWorkerRegistryDefinition( + url, + name, + internalDurableObjects, + entrypointAddresses + ); + + if (!definition) { + return; + } + + return registerWorker(name, definition); } export function Local(props: LocalProps) { diff --git a/packages/wrangler/templates/startDevWorker/ProxyWorker.ts b/packages/wrangler/templates/startDevWorker/ProxyWorker.ts index 39731d3f6755..5cbcb63d3df2 100644 --- a/packages/wrangler/templates/startDevWorker/ProxyWorker.ts +++ b/packages/wrangler/templates/startDevWorker/ProxyWorker.ts @@ -13,6 +13,7 @@ interface Env { PROXY_CONTROLLER: Fetcher; PROXY_CONTROLLER_AUTH_SECRET: string; DURABLE_OBJECT: DurableObjectNamespace; + NAME: string; } // request.cf.hostMetadata is verbose to type using the workers-types Request -- this allows us to have Request correctly typed in this scope @@ -35,7 +36,7 @@ export default { export class ProxyWorker implements DurableObject { constructor( readonly state: DurableObjectState, - readonly env: Env + public env: Env ) {} proxyData?: ProxyData; @@ -78,8 +79,8 @@ export class ProxyWorker implements DurableObject { }); } - processProxyControllerRequest(request: Request) { - const event = request.cf?.hostMetadata; + async processProxyControllerRequest(request: Request) { + const event = await request.json(); switch (event?.type) { case "pause": this.proxyData = undefined;