Skip to content

Commit

Permalink
Remove fastify-websocket, proxy ws events directly (#122)
Browse files Browse the repository at this point in the history
* Drop fastify-websocket

* Add socket.io tests

* Disconnect websockets when server is closing

* Remove rejectUnauthorized default value

* Add plugin metadata

* Actually test socket.io proxy

* Fix ws server closing

* Test ws clients close event

* Add missing tearDown
  • Loading branch information
greguz authored Dec 18, 2020
1 parent bd82f1a commit 0f7b501
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 45 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ This module has _partial_ support for forwarding websockets by passing a
A few things are missing:

1. forwarding headers as well as `rewriteHeaders`
2. support for paths, `prefix` and `rewritePrefix`
3. request id logging
2. request id logging
3. support `ignoreTrailingSlash`

Pull requests are welcome to finish this feature.

Expand Down
8 changes: 8 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import {
FastifyReplyFromOptions
} from "fastify-reply-from"

import {
ClientOptions,
ServerOptions
} from "ws"

export interface FastifyHttpProxyOptions extends FastifyReplyFromOptions {
upstream: string;
prefix?: string;
Expand All @@ -18,6 +23,9 @@ export interface FastifyHttpProxyOptions extends FastifyReplyFromOptions {
beforeHandler?: preHandlerHookHandler;
config?: Object;
replyOptions?: Object;
websocket?: boolean
wsClientOptions?: ClientOptions
wsServerOptions?: ServerOptions
}

declare const fastifyHttpProxy: FastifyPlugin<FastifyHttpProxyOptions>;
Expand Down
146 changes: 107 additions & 39 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,104 @@
'use strict'

const From = require('fastify-reply-from')
const WebSocketPlugin = require('fastify-websocket')
const WebSocket = require('ws')
const { pipeline } = require('stream')
const nonWsMethods = ['DELETE', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']

module.exports = async function (fastify, opts) {
const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']

function liftErrorCode (code) {
if (typeof code !== 'number') {
// Sometimes "close" event emits with a non-numeric value
return 1011
} else if (code === 1004 || code === 1005 || code === 1006) {
// ws module forbid those error codes usage, lift to "application level" (4xxx)
return 4000 + (code % 1000)
} else {
return code
}
}

function closeWebSocket (socket, code, reason) {
if (socket.readyState === WebSocket.OPEN) {
socket.close(liftErrorCode(code), reason)
}
}

function waitConnection (socket, write) {
if (socket.readyState === WebSocket.CONNECTING) {
socket.once('open', write)
} else {
write()
}
}

function proxyWebSockets (source, target) {
function close (code, reason) {
closeWebSocket(source, code, reason)
closeWebSocket(target, code, reason)
}

source.on('message', data => waitConnection(target, () => target.send(data)))
source.on('ping', data => waitConnection(target, () => target.ping(data)))
source.on('pong', data => waitConnection(target, () => target.pong(data)))
source.on('close', close)
source.on('error', error => close(1011, error.message))
source.on('unexpected-response', () => close(1011, 'unexpected response'))

// source WebSocket is already connected because it is created by ws server
target.on('message', data => source.send(data))
target.on('ping', data => source.ping(data))
target.on('pong', data => source.pong(data))
target.on('close', close)
target.on('error', error => close(1011, error.message))
target.on('unexpected-response', () => close(1011, 'unexpected response'))
}

function createWebSocketUrl (options, request) {
const source = new URL(request.url, 'http://127.0.0.1')

const target = new URL(
options.rewritePrefix || options.prefix || source.pathname,
options.upstream
)

target.search = source.search

return target
}

function setupWebSocketProxy (fastify, options) {
const server = new WebSocket.Server({
path: options.prefix,
server: fastify.server,
...options.wsServerOptions
})

fastify.addHook('onClose', (instance, done) => server.close(done))

// To be able to close the HTTP server,
// all WebSocket clients need to be disconnected.
// Fastify is missing a pre-close event, or the ability to
// add a hook before the server.close call. We need to resort
// to monkeypatching for now.
const oldClose = fastify.server.close
fastify.server.close = function (done) {
for (const client of server.clients) {
client.close()
}
oldClose.call(this, done)
}

server.on('connection', (source, request) => {
const url = createWebSocketUrl(options, request)

const target = new WebSocket(url, options.wsClientOptions)

fastify.log.debug({ url: url.href }, 'proxy websocket')
proxyWebSockets(source, target)
})
}

async function httpProxy (fastify, opts) {
if (!opts.upstream) {
throw new Error('upstream must be specified')
}
Expand Down Expand Up @@ -46,33 +138,16 @@ module.exports = async function (fastify, opts) {
done(null, payload)
}

if (opts.websocket) {
fastify.register(WebSocketPlugin, opts.websocket)
}

fastify.get('/', {
preHandler,
config: opts.config || {},
handler,
wsHandler
})
fastify.get('/*', {
preHandler,
config: opts.config || {},
handler,
wsHandler
})

fastify.route({
url: '/',
method: nonWsMethods,
method: httpMethods,
preHandler,
config: opts.config || {},
handler
})
fastify.route({
url: '/*',
method: nonWsMethods,
method: httpMethods,
preHandler,
config: opts.config || {},
handler
Expand All @@ -84,21 +159,14 @@ module.exports = async function (fastify, opts) {
reply.from(dest || '/', replyOpts)
}

function wsHandler (conn, req) {
// TODO support paths and querystrings
// TODO support rewriteHeader
// TODO support rewritePrefix
const ws = new WebSocket(opts.upstream)
const stream = WebSocket.createWebSocketStream(ws)

// TODO fastify-websocket should create a logger for each connection
fastify.log.info('starting websocket tunnel')
pipeline(conn, stream, conn, function (err) {
if (err) {
fastify.log.info({ err }, 'websocket tunnel terminated with error')
return
}
fastify.log.info('websocket tunnel terminated')
})
if (opts.websocket) {
setupWebSocketProxy(fastify, opts)
}
}

httpProxy[Symbol.for('plugin-meta')] = {
fastify: '^3.0.0',
name: 'fastify-http-proxy'
}

module.exports = httpProxy
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"homepage": "https://github.com/fastify/fastify-http-proxy#readme",
"devDependencies": {
"@types/node": "^14.0.27",
"@types/ws": "^7.4.0",
"@typescript-eslint/parser": "^4.0.0",
"eslint-plugin-typescript": "^0.14.0",
"express": "^4.16.4",
Expand All @@ -36,18 +37,18 @@
"http-errors": "^1.8.0",
"http-proxy": "^1.17.0",
"make-promises-safe": "^5.0.0",
"pre-commit": "^1.2.2",
"simple-get": "^4.0.0",
"snazzy": "^9.0.0",
"socket.io": "^3.0.4",
"socket.io-client": "^3.0.4",
"standard": "^16.0.3",
"tap": "^14.10.8",
"tsd": "^0.14.0",
"typescript": "^4.0.2"
},
"dependencies": {
"fastify-reply-from": "^3.1.3",
"fastify-websocket": "^2.0.7",
"ws": "^7.3.1"
"ws": "^7.4.1"
},
"tsd": {
"directory": "test"
Expand Down
52 changes: 52 additions & 0 deletions test/socket.io.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const proxy = require('../')
const ioServer = require('socket.io')
const ioClient = require('socket.io-client')
const { createServer } = require('http')
const { promisify } = require('util')
const { once } = require('events')

test('proxy socket.io', async t => {
t.plan(2)

const srvUpstream = createServer()
t.tearDown(srvUpstream.close.bind(srvUpstream))

const srvSocket = new ioServer.Server(srvUpstream)
t.tearDown(srvSocket.close.bind(srvSocket))

await promisify(srvUpstream.listen.bind(srvUpstream))(0)

const srvProxy = Fastify()
t.tearDown(srvProxy.close.bind(srvProxy))

srvProxy.register(proxy, {
upstream: `http://127.0.0.1:${srvUpstream.address().port}`,
websocket: true
})

await srvProxy.listen(0)

srvSocket.on('connection', socket => {
socket.on('hello', data => {
t.is(data, 'world')
socket.emit('hi', 'socket')
})
})

const cliSocket = ioClient(`http://127.0.0.1:${srvProxy.server.address().port}`)
t.tearDown(cliSocket.close.bind(cliSocket))

cliSocket.emit('hello', 'world')

const out = await once(cliSocket, 'hi')
t.is(out[0], 'socket')

await Promise.all([
once(cliSocket, 'disconnect'),
srvProxy.close()
])
})
2 changes: 1 addition & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ async function run () {
t.deepEqual(reply.context.config, {
foo: 'bar',
url: '/*',
// GET is not there because of the nonWsMethods.
method: [
'DELETE',
'GET',
'HEAD',
'PATCH',
'POST',
Expand Down
5 changes: 5 additions & 0 deletions test/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ test('basic websocket proxy', async (t) => {
const [buf] = await once(stream, 'data')

t.is(buf.toString(), 'hello')

await Promise.all([
once(ws, 'close'),
server.close()
])
})

0 comments on commit 0f7b501

Please sign in to comment.