Skip to content

Commit

Permalink
tunnel server: more aggressive destroy
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Sep 11, 2023
1 parent d7a1eee commit aa1c53b
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 8 deletions.
3 changes: 2 additions & 1 deletion tunnel-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"jest": "^29.4.3",
"nodemon": "^2.0.20",
"ts-jest": "^29.1.0",
"typescript": "^5.0.4"
"typescript": "^5.0.4",
"wait-for-expect": "^3.0.2"
},
"scripts": {
"test": "yarn jest",
Expand Down
92 changes: 92 additions & 0 deletions tunnel-server/src/destroy-server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* eslint-disable jest/no-standalone-expect */
import net from 'node:net'
import events from 'node:events'
import { describe, it, expect, beforeEach, afterEach } from '@jest/globals'
import { promisify } from 'node:util'
import waitForExpect from 'wait-for-expect'
import { createDestroy } from './destroy-server'

describe('createDestroy', () => {
let server: net.Server
let port: number
let serverSockets: net.Socket[]
let destroy: (cb?: () => void) => void
let closed: boolean
beforeEach(async () => {
serverSockets = []
server = net.createServer(socket => {
serverSockets.push(socket)
socket.on('data', data => socket.write(data))
socket.on('error', () => undefined)
socket.unref()
})
destroy = createDestroy(server)
server.listen({ port: 0, host: '127.0.0.1' })
server.unref()
closed = false
server.once('close', () => { closed = true })
await events.once(server, 'listening')
port = (server.address() as net.AddressInfo).port
})

afterEach(async () => {
if (server.listening) {
server.close()
}
})

describe('when no sockets are connected', () => {
describe('and only close is called', () => {
beforeEach(() => {
server.close()
})

it('emits the close event immediately', async () => {
await waitForExpect(async () => {
expect(closed).toBe(true)
}, 1000, 100)
})
})
})

describe('when sockets are connected', () => {
let socket: net.Socket
beforeEach(async () => {
socket = net.connect({ host: '127.0.0.1', port, keepAlive: true, noDelay: true })
socket.on('error', () => undefined)
await events.once(socket, 'connect')
socket.unref()

// flood socket
do {
// no-op
} while (socket.write('hello'))
await promisify(socket.write.bind(socket))('hello')
})

describe('and only close is called', () => {
beforeEach(async () => {
expect(closed).toBe(false)
server.close()
await new Promise<void>(resolve => { setTimeout(resolve, 100) })
})

it('does not emit the close event immediately', async () => {
expect(closed).toBe(false)
})
})

describe('and destroy is called', () => {
beforeEach(async () => {
expect(closed).toBe(false)
destroy()
})

it('emits the close event immediately', async () => {
await waitForExpect(async () => {
expect(closed).toBe(true)
}, 1000, 100)
})
})
})
})
22 changes: 22 additions & 0 deletions tunnel-server/src/destroy-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export type Destroyable = {
destroy: () => void
on: (event: 'close', cb: () => void) => void
}

export type DestroyableServer<T extends Destroyable> = {
on: (event: 'connection', cb: (socket: T) => void) => void
close: (cb?: (err?: Error) => void) => void
}

export const createDestroy = <T extends Destroyable>(server: DestroyableServer<T>) => {
const connections = new Set<T>()
server.on('connection', socket => {
connections.add(socket)
socket.on('close', () => { connections.delete(socket) })
})

return (cb?: (err?: Error) => void) => {
server.close(cb)
connections.forEach(socket => { socket.destroy() })
}
}
15 changes: 8 additions & 7 deletions tunnel-server/src/ssh/base-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ssh2, { ParsedKey, SocketBindInfo } from 'ssh2'
import { inspect } from 'util'
import EventEmitter from 'node:events'
import { ForwardRequest, parseForwardRequest } from '../forward-request'
import { createDestroy } from '../destroy-server'

const clientIdFromPublicSsh = (key: Buffer) =>
crypto.createHash('sha1').update(key).digest('base64url').replace(/[_-]/g, '')
Expand Down Expand Up @@ -94,7 +95,7 @@ export const baseSshServer = (
socketDir: string
}
): BaseSshServer => {
const serverEmitter = new EventEmitter() as Omit<BaseSshServer, 'close' | 'listen'>
const serverEmitter = new EventEmitter({ captureRejections: true }) as Omit<BaseSshServer, 'close' | 'listen'>
const server = new ssh2.Server(
{
// debug: x => log.debug(x),
Expand Down Expand Up @@ -129,7 +130,7 @@ export const baseSshServer = (
return
}

preevySshClient = Object.assign(new EventEmitter(), {
preevySshClient = Object.assign(new EventEmitter({ captureRejections: true }), {
publicKey: keyOrError,
clientId: clientIdFromPublicSsh(keyOrError.getPublicSSH()),
envId: ctx.username,
Expand Down Expand Up @@ -206,7 +207,7 @@ export const baseSshServer = (
)
})

const closeSocketServer = () => socketServer.close()
const destroySocketServer = createDestroy(socketServer)

socketServer
.listen(socketPath, () => {
Expand All @@ -217,17 +218,17 @@ export const baseSshServer = (
})
.on('error', (err: unknown) => {
log.error('socketServer request %j error: %j', request, err)
socketServer.close()
destroySocketServer()
rejectForward(err)
})
.on('close', () => {
log.debug('socketServer close: %j', socketPath)
socketServers.delete(request)
client.removeListener('close', closeSocketServer)
client.removeListener('close', destroySocketServer)
})

client.once('close', closeSocketServer)
client.once('end', closeSocketServer)
client.once('close', destroySocketServer)
client.once('end', destroySocketServer)
}),
(reason: Error) => {
log.error('[email protected]: rejecting %j, reason: %j', request, inspect(reason))
Expand Down
5 changes: 5 additions & 0 deletions tunnel-server/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3703,6 +3703,11 @@ v8-to-istanbul@^9.0.1:
"@types/istanbul-lib-coverage" "^2.0.1"
convert-source-map "^1.6.0"

wait-for-expect@^3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/wait-for-expect/-/wait-for-expect-3.0.2.tgz#d2f14b2f7b778c9b82144109c8fa89ceaadaa463"
integrity sha512-cfS1+DZxuav1aBYbaO/kE06EOS8yRw7qOFoD3XtjTkYvCvh3zUvNST8DXK/nPaeqIzIv3P3kL3lRJn8iwOiSag==

walker@^1.0.8:
version "1.0.8"
resolved "https://registry.yarnpkg.com/walker/-/walker-1.0.8.tgz#bd498db477afe573dc04185f011d3ab8a8d7653f"
Expand Down

0 comments on commit aa1c53b

Please sign in to comment.