diff --git a/README.md b/README.md index 121c9b8..88962a2 100644 --- a/README.md +++ b/README.md @@ -152,8 +152,8 @@ This module has _partial_ support for forwarding websockets by passing a A few things are missing: 1. forwarding headers as well as `rewriteHeaders` -2. support for paths, `prefix` and `rewritePrefix` -3. request id logging +2. request id logging +3. support `ignoreTrailingSlash` Pull requests are welcome to finish this feature. diff --git a/index.d.ts b/index.d.ts index c2ec5b8..c8542d1 100644 --- a/index.d.ts +++ b/index.d.ts @@ -9,6 +9,11 @@ import { FastifyReplyFromOptions } from "fastify-reply-from" +import { + ClientOptions, + ServerOptions +} from "ws" + export interface FastifyHttpProxyOptions extends FastifyReplyFromOptions { upstream: string; prefix?: string; @@ -18,6 +23,9 @@ export interface FastifyHttpProxyOptions extends FastifyReplyFromOptions { beforeHandler?: preHandlerHookHandler; config?: Object; replyOptions?: Object; + websocket?: boolean + wsClientOptions?: ClientOptions + wsServerOptions?: ServerOptions } declare const fastifyHttpProxy: FastifyPlugin; diff --git a/index.js b/index.js index 52d7043..f35cfbe 100644 --- a/index.js +++ b/index.js @@ -1,12 +1,104 @@ 'use strict' const From = require('fastify-reply-from') -const WebSocketPlugin = require('fastify-websocket') const WebSocket = require('ws') -const { pipeline } = require('stream') -const nonWsMethods = ['DELETE', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS'] -module.exports = async function (fastify, opts) { +const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS'] + +function liftErrorCode (code) { + if (typeof code !== 'number') { + // Sometimes "close" event emits with a non-numeric value + return 1011 + } else if (code === 1004 || code === 1005 || code === 1006) { + // ws module forbid those error codes usage, lift to "application level" (4xxx) + return 4000 + (code % 1000) + } else { + return code + } +} + +function closeWebSocket (socket, code, reason) { + if (socket.readyState === WebSocket.OPEN) { + socket.close(liftErrorCode(code), reason) + } +} + +function waitConnection (socket, write) { + if (socket.readyState === WebSocket.CONNECTING) { + socket.once('open', write) + } else { + write() + } +} + +function proxyWebSockets (source, target) { + function close (code, reason) { + closeWebSocket(source, code, reason) + closeWebSocket(target, code, reason) + } + + source.on('message', data => waitConnection(target, () => target.send(data))) + source.on('ping', data => waitConnection(target, () => target.ping(data))) + source.on('pong', data => waitConnection(target, () => target.pong(data))) + source.on('close', close) + source.on('error', error => close(1011, error.message)) + source.on('unexpected-response', () => close(1011, 'unexpected response')) + + // source WebSocket is already connected because it is created by ws server + target.on('message', data => source.send(data)) + target.on('ping', data => source.ping(data)) + target.on('pong', data => source.pong(data)) + target.on('close', close) + target.on('error', error => close(1011, error.message)) + target.on('unexpected-response', () => close(1011, 'unexpected response')) +} + +function createWebSocketUrl (options, request) { + const source = new URL(request.url, 'http://127.0.0.1') + + const target = new URL( + options.rewritePrefix || options.prefix || source.pathname, + options.upstream + ) + + target.search = source.search + + return target +} + +function setupWebSocketProxy (fastify, options) { + const server = new WebSocket.Server({ + path: options.prefix, + server: fastify.server, + ...options.wsServerOptions + }) + + fastify.addHook('onClose', (instance, done) => server.close(done)) + + // To be able to close the HTTP server, + // all WebSocket clients need to be disconnected. + // Fastify is missing a pre-close event, or the ability to + // add a hook before the server.close call. We need to resort + // to monkeypatching for now. + const oldClose = fastify.server.close + fastify.server.close = function (done) { + for (const client of server.clients) { + client.close() + } + oldClose.call(this, done) + } + + server.on('connection', (source, request) => { + const url = createWebSocketUrl(options, request) + + const target = new WebSocket(url, options.wsClientOptions) + + fastify.log.debug({ url: url.href }, 'proxy websocket') + proxyWebSockets(source, target) + }) +} + +async function httpProxy (fastify, opts) { if (!opts.upstream) { throw new Error('upstream must be specified') } @@ -46,33 +138,16 @@ module.exports = async function (fastify, opts) { done(null, payload) } - if (opts.websocket) { - fastify.register(WebSocketPlugin, opts.websocket) - } - - fastify.get('/', { - preHandler, - config: opts.config || {}, - handler, - wsHandler - }) - fastify.get('/*', { - preHandler, - config: opts.config || {}, - handler, - wsHandler - }) - fastify.route({ url: '/', - method: nonWsMethods, + method: httpMethods, preHandler, config: opts.config || {}, handler }) fastify.route({ url: '/*', - method: nonWsMethods, + method: httpMethods, preHandler, config: opts.config || {}, handler @@ -84,21 +159,14 @@ module.exports = async function (fastify, opts) { reply.from(dest || '/', replyOpts) } - function wsHandler (conn, req) { - // TODO support paths and querystrings - // TODO support rewriteHeader - // TODO support rewritePrefix - const ws = new WebSocket(opts.upstream) - const stream = WebSocket.createWebSocketStream(ws) - - // TODO fastify-websocket should create a logger for each connection - fastify.log.info('starting websocket tunnel') - pipeline(conn, stream, conn, function (err) { - if (err) { - fastify.log.info({ err }, 'websocket tunnel terminated with error') - return - } - fastify.log.info('websocket tunnel terminated') - }) + if (opts.websocket) { + setupWebSocketProxy(fastify, opts) } } + +httpProxy[Symbol.for('plugin-meta')] = { + fastify: '^3.0.0', + name: 'fastify-http-proxy' +} + +module.exports = httpProxy diff --git a/package.json b/package.json index d80731e..981eb7b 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "homepage": "https://github.com/fastify/fastify-http-proxy#readme", "devDependencies": { "@types/node": "^14.0.27", + "@types/ws": "^7.4.0", "@typescript-eslint/parser": "^4.0.0", "eslint-plugin-typescript": "^0.14.0", "express": "^4.16.4", @@ -36,9 +37,10 @@ "http-errors": "^1.8.0", "http-proxy": "^1.17.0", "make-promises-safe": "^5.0.0", - "pre-commit": "^1.2.2", "simple-get": "^4.0.0", "snazzy": "^9.0.0", + "socket.io": "^3.0.4", + "socket.io-client": "^3.0.4", "standard": "^16.0.3", "tap": "^14.10.8", "tsd": "^0.14.0", @@ -46,8 +48,7 @@ }, "dependencies": { "fastify-reply-from": "^3.1.3", - "fastify-websocket": "^2.0.7", - "ws": "^7.3.1" + "ws": "^7.4.1" }, "tsd": { "directory": "test" diff --git a/test/socket.io.js b/test/socket.io.js new file mode 100644 index 0000000..1401eda --- /dev/null +++ b/test/socket.io.js @@ -0,0 +1,52 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const proxy = require('../') +const ioServer = require('socket.io') +const ioClient = require('socket.io-client') +const { createServer } = require('http') +const { promisify } = require('util') +const { once } = require('events') + +test('proxy socket.io', async t => { + t.plan(2) + + const srvUpstream = createServer() + t.tearDown(srvUpstream.close.bind(srvUpstream)) + + const srvSocket = new ioServer.Server(srvUpstream) + t.tearDown(srvSocket.close.bind(srvSocket)) + + await promisify(srvUpstream.listen.bind(srvUpstream))(0) + + const srvProxy = Fastify() + t.tearDown(srvProxy.close.bind(srvProxy)) + + srvProxy.register(proxy, { + upstream: `http://127.0.0.1:${srvUpstream.address().port}`, + websocket: true + }) + + await srvProxy.listen(0) + + srvSocket.on('connection', socket => { + socket.on('hello', data => { + t.is(data, 'world') + socket.emit('hi', 'socket') + }) + }) + + const cliSocket = ioClient(`http://127.0.0.1:${srvProxy.server.address().port}`) + t.tearDown(cliSocket.close.bind(cliSocket)) + + cliSocket.emit('hello', 'world') + + const out = await once(cliSocket, 'hi') + t.is(out[0], 'socket') + + await Promise.all([ + once(cliSocket, 'disconnect'), + srvProxy.close() + ]) +}) diff --git a/test/test.js b/test/test.js index 36f2901..378af8b 100644 --- a/test/test.js +++ b/test/test.js @@ -180,9 +180,9 @@ async function run () { t.deepEqual(reply.context.config, { foo: 'bar', url: '/*', - // GET is not there because of the nonWsMethods. method: [ 'DELETE', + 'GET', 'HEAD', 'PATCH', 'POST', diff --git a/test/websocket.js b/test/websocket.js index 5c00f4b..9a10003 100644 --- a/test/websocket.js +++ b/test/websocket.js @@ -46,4 +46,9 @@ test('basic websocket proxy', async (t) => { const [buf] = await once(stream, 'data') t.is(buf.toString(), 'hello') + + await Promise.all([ + once(ws, 'close'), + server.close() + ]) })