Skip to content

Commit

Permalink
tunnel server: check and close inactive connections (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon authored Sep 19, 2023
1 parent 6daf2ce commit 81650b1
Show file tree
Hide file tree
Showing 22 changed files with 589 additions and 182 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/tunnel-server-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const connectToTunnelServerSsh = async ({ tunnelOpts, log, tunnelingKey,
log,
tunnelOpts,
clientPrivateKey: tunnelingKey,
username: process.env.USER || 'preview',
username: process.env.USER || 'preevy',
confirmHostFingerprint: async (...args) => {
spinner?.stop()
return await confirmHostFingerprint(...args)
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/ssh/base-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export const baseSshClient = async (
ssh.on('ready', () => resolve(result))
ssh.on('error', err => {
reject(err)
ssh.end()
})
ssh.connect({
debug: msg => log.debug(msg),
Expand Down
14 changes: 5 additions & 9 deletions packages/compose-tunnel-agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ const main = async () => {
log: sshLog,
})

sshClient.ssh.on('error', async err => {
log.error('ssh client error: %j', inspect(err))
await sshClient.end()
})

sshClient.ssh.on('close', () => {
if (!endRequested) {
log.error('ssh client closed unexpectedly')
Expand Down Expand Up @@ -147,17 +142,18 @@ const SHUTDOWN_TIMEOUT = 5000

void main().then(
({ end }) => {
['SIGTERM', 'SIGINT'].forEach(signal => {
process.once(signal, async () => {
log.info(`shutting down on ${signal}`)
['SIGTERM', 'SIGINT', 'uncaughtException'].forEach(signal => {
process.once(signal, async (...args) => {
const argsStr = args ? args.map(arg => inspect(arg)).join(', ') : undefined
log.warn(`shutting down on ${[signal, argsStr].filter(Boolean).join(': ')}`)
const endResult = await Promise.race([
end().then(() => true),
new Promise<void>(resolve => { setTimeout(resolve, SHUTDOWN_TIMEOUT) }),
])
if (!endResult) {
log.error(`timed out while waiting ${SHUTDOWN_TIMEOUT}ms for server to close, exiting`)
}
process.exit(0)
process.exit(1)
})
})
},
Expand Down
5 changes: 5 additions & 0 deletions packages/compose-tunnel-agent/src/ssh/tunnel-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ export const sshClient = async ({
connectionConfig,
})

ssh.on('error', err => {
log.error('ssh client error: %j', inspect(err))
// baseSshClient calls end
})

const currentForwards = new Map<string, Forward>()

ssh.on('unix connection', ({ socketPath: forwardRequestId }, accept, reject) => {
Expand Down
9 changes: 4 additions & 5 deletions tunnel-server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ const saasPublicKey = createPublicKey(SAAS_PUBLIC_KEY)
const SAAS_JWT_ISSUER = process.env.SAAS_JWT_ISSUER ?? 'app.livecycle.run'

const activeTunnelStore = inMemoryActiveTunnelStore({ log })
const appSessionStore = cookieSessionStore({ domain: BASE_URL.hostname, schema: claimsSchema, keys: process.env.COOKIE_SECRETS?.split(' ') })
const sessionStore = cookieSessionStore({ domain: BASE_URL.hostname, schema: claimsSchema, keys: process.env.COOKIE_SECRETS?.split(' ') })
const loginUrl = new URL('/login', editUrl(BASE_URL, { hostname: `auth.${BASE_URL.hostname}` })).toString()
const app = createApp({
sessionStore: appSessionStore,
sessionStore,
activeTunnelStore,
baseUrl: BASE_URL,
proxy: proxy({
activeTunnelStore,
log,
loginUrl,
sessionStore: appSessionStore,
sessionStore,
saasPublicKey,
jwtSaasIssuer: SAAS_JWT_ISSUER,
baseHostname: BASE_URL.hostname,
Expand All @@ -65,7 +65,6 @@ const app = createApp({
jwtSaasIssuer: SAAS_JWT_ISSUER,
saasPublicKey,
})
const sshLogger = log.child({ name: 'ssh_server' })

const tunnelUrl = (
rootUrl: URL,
Expand All @@ -74,7 +73,7 @@ const tunnelUrl = (
) => editUrl(rootUrl, { hostname: `${activeTunnelStoreKey(clientId, tunnel)}.${rootUrl.hostname}` }).toString()

const sshServer = createSshServer({
log: sshLogger,
log: log.child({ name: 'ssh_server' }),
sshPrivateKey,
socketDir: '/tmp', // TODO
activeTunnelStore,
Expand Down
15 changes: 13 additions & 2 deletions tunnel-server/jest.config.cjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: 'ts-jest',
preset: 'ts-jest/presets/default-esm',
testEnvironment: 'node',
testMatch: ['!dist/', '**/*.test.ts'],
};
extensionsToTreatAsEsm: ['.ts'],
transform: {
// '^.+\\.[tj]sx?$' to process js/ts with `ts-jest`
// '^.+\\.m?[tj]sx?$' to process js/ts/mjs/mts with `ts-jest`
'^.+\\.tsx?$': [
'ts-jest',
{
useESM: true,
},
],
},
}
9 changes: 8 additions & 1 deletion tunnel-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,28 @@
"iconv-lite": "^0.6.3",
"jose": "^4.14.4",
"lodash": "^4.17.21",
"node-fetch": "2.6.9",
"p-timeout": "^6.1.2",
"pino": "^8.11.0",
"pino-pretty": "^9.4.0",
"prom-client": "^14.2.0",
"ssh2": "^1.12.0",
"ts-node": "^10.9.1",
"ts-pattern": "^5.0.4",
"tseep": "^1.1.1",
"zod": "^3.21.4"
},
"engines": {
"node": ">=18.0.0"
},
"devDependencies": {
"@jest/globals": "^29.5.0",
"@types/content-type": "^1.1.5",
"@types/cookies": "^0.7.7",
"@types/http-proxy": "^1.17.9",
"@types/lodash": "^4.14.192",
"@types/node": "18",
"@types/node-fetch": "^2.6.4",
"@types/ssh2": "^1.11.8",
"@typescript-eslint/eslint-plugin": "^5.55.0",
"@typescript-eslint/parser": "^5.55.0",
Expand All @@ -40,7 +47,7 @@
"wait-for-expect": "^3.0.2"
},
"scripts": {
"test": "yarn jest",
"test": "yarn node --experimental-vm-modules $(yarn bin jest)",
"start": "ts-node ./index.ts",
"build": "tsc --noEmit",
"dev": "DEBUG=1 yarn nodemon ./index.ts",
Expand Down
5 changes: 3 additions & 2 deletions tunnel-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ export const app = ({ proxy, sessionStore, baseUrl, activeTunnelStore, log, logi
res.statusCode = 400
return { error: 'returnPath must be a relative path' }
}
const activeTunnel = await activeTunnelStore.get(envId)
if (!activeTunnel) {
const activeTunnelEntry = await activeTunnelStore.get(envId)
if (!activeTunnelEntry) {
res.statusCode = 404
return { error: 'unknown envId' }
}
const { value: activeTunnel } = activeTunnelEntry
const session = sessionStore(req.raw, res.raw, activeTunnel.publicKeyThumbprint)
if (!session.user) {
const auth = jwtAuthenticator(
Expand Down
68 changes: 68 additions & 0 deletions tunnel-server/src/events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { EventEmitter } from 'tseep'
import { afterAll, beforeAll, beforeEach, describe, expect, it, jest } from '@jest/globals'
import { TimeoutError } from 'p-timeout'
import { onceWithTimeout } from './events'

describe('onceWithTimeout', () => {
beforeAll(() => {
jest.useFakeTimers()
})
afterAll(() => {
jest.useRealTimers()
})

let emitter: EventEmitter<{ foo: () => void; error: (err: Error) => void }>
beforeEach(() => {
emitter = new EventEmitter()
})

describe('when no timeout occurs', () => {
let p: Promise<void>
beforeEach(() => {
p = onceWithTimeout(emitter, 'foo', { milliseconds: 10 })
emitter.emit('foo')
})
it('resolves to undefined', async () => {
await expect(p).resolves.toBeUndefined()
})
})

describe('when an error is emitted', () => {
let p: Promise<void>
const e = new Error('boom')
beforeEach(() => {
p = onceWithTimeout(emitter, 'foo', { milliseconds: 10 })
emitter.emit('error', e)
})
it('rejects with the error', async () => {
await expect(p).rejects.toThrow(e)
})
})

describe('when a timeout occurs', () => {
describe('when no fallback is specified', () => {
let p: Promise<void>
beforeEach(() => {
p = onceWithTimeout(emitter, 'foo', { milliseconds: 10 })
jest.advanceTimersByTime(10)
})

it('rejects with a TimeoutError', async () => {
await expect(p).rejects.toThrow(TimeoutError)
await expect(p).rejects.toThrow('timed out after 10ms')
})
})

describe('when a fallback is specified', () => {
let p: Promise<12>
beforeEach(() => {
p = onceWithTimeout(emitter, 'foo', { milliseconds: 10, fallback: async () => 12 as const })
jest.advanceTimersByTime(10)
})

it('resolves with the fallback', async () => {
await expect(p).resolves.toBe(12)
})
})
})
})
36 changes: 36 additions & 0 deletions tunnel-server/src/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import events from 'events'
import { TimeoutError } from 'p-timeout'

interface NodeEventTarget {
once(eventName: string | symbol, listener: (...args: unknown[]) => void): this
}

export async function onceWithTimeout(
target: NodeEventTarget,
event: string | symbol,
opts: { milliseconds: number },
): Promise<void>
export async function onceWithTimeout <T = unknown>(
target: NodeEventTarget,
event: string | symbol,
opts: { milliseconds: number; fallback: () => T | Promise<T> },
): Promise<T>
export async function onceWithTimeout <T = unknown>(
target: NodeEventTarget,
event: string | symbol,
{ milliseconds, fallback }: { milliseconds: number; fallback?: () => T | Promise<T> },
): Promise<T | void> {
const signal = AbortSignal.timeout(milliseconds)
return await events.once(target, event, { signal }).then(
() => undefined,
async e => {
if (!signal.aborted || (e as Error).name !== 'AbortError') {
throw e
}
if (fallback) {
return await fallback()
}
throw new TimeoutError(`timed out after ${milliseconds}ms`)
},
)
}
10 changes: 10 additions & 0 deletions tunnel-server/src/id-generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const idGenerator = () => {
let nextId = 0
return {
next: () => {
const result = nextId
nextId += 1
return result
},
}
}
53 changes: 53 additions & 0 deletions tunnel-server/src/memory-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Logger } from 'pino'
import { IEventEmitter, EventEmitter } from 'tseep'
import { nextTick } from 'process'
import { idGenerator } from './id-generator'

export class KeyAlreadyExistsError<V> extends Error {
constructor(readonly key: string, readonly value: V) {
super(`key already exists: "${key}"`)
}
}

export type TransactionDescriptor = { readonly txId: number | string }

type StoreEvents = {
delete: () => void
}

export type EntryWatcher = {
once: (event: 'delete', listener: () => void) => void
}

export const inMemoryStore = <V extends {}>({ log }: { log: Logger }) => {
type MapValue = { value: V; watcher: IEventEmitter<StoreEvents>; setTx: TransactionDescriptor }
const map = new Map<string, MapValue>()
const txIdGen = idGenerator()
return {
get: async (key: string) => {
const entry = map.get(key)
return entry === undefined ? undefined : { value: entry.value, watcher: entry.watcher }
},
set: async (key: string, value: V) => {
const existing = map.get(key)
if (existing !== undefined) {
throw new KeyAlreadyExistsError<V>(key, existing.value)
}
const tx: TransactionDescriptor = { txId: txIdGen.next() }
log.debug('setting key %s id %s: %j', key, tx.txId, value)
const watcher = new EventEmitter<StoreEvents>()
map.set(key, { value, watcher, setTx: tx })
return { tx, watcher: watcher as EntryWatcher }
},
delete: async (key: string, setTx?: TransactionDescriptor) => {
const value = map.get(key)
if (value && (setTx === undefined || value.setTx.txId === setTx.txId) && map.delete(key)) {
nextTick(() => { value.watcher.emit('delete') })
return true
}
return false
},
}
}

export type Store<V extends {}> = ReturnType<typeof inMemoryStore<V>>
Loading

0 comments on commit 81650b1

Please sign in to comment.