Skip to content

Commit

Permalink
convert cta http to fastify
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Sep 6, 2023
1 parent 7cf16f2 commit e7a2fb1
Show file tree
Hide file tree
Showing 19 changed files with 311 additions and 414 deletions.
44 changes: 13 additions & 31 deletions packages/compose-tunnel-agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import fs from 'fs'
import path from 'path'
import Docker from 'dockerode'
import { inspect } from 'node:util'
import http from 'node:http'
import { rimraf } from 'rimraf'
import pino from 'pino'
import pinoPretty from 'pino-pretty'
import { EOL } from 'os'
import {
requiredEnv,
formatPublicKey,
Expand All @@ -16,9 +13,8 @@ import {
MachineStatusCommand,
COMPOSE_TUNNEL_AGENT_PORT,
} from '@preevy/common'
import createApiServerHandler from './src/api-server'
import { createApp } from './src/api-server'
import { sshClient as createSshClient } from './src/ssh'
import { tryHandler, tryUpgradeHandler } from './src/api-server/http-server-helpers'
import { runMachineStatusCommand } from './src/machine-status'
import { envMetadata } from './src/metadata'
import { readAllFiles } from './src/files'
Expand Down Expand Up @@ -54,7 +50,15 @@ const sshConnectionConfigFromEnv = async (): Promise<{ connectionConfig: SshConn
}
}

const writeLineToStdout = (s: string) => [s, EOL].forEach(d => process.stdout.write(d))
const fastifyListenArgsFromEnv = async () => {
const portOrPath = process.env.PORT ?? COMPOSE_TUNNEL_AGENT_PORT
const portNumber = Number(portOrPath)
if (typeof portOrPath === 'string' && Number.isNaN(portNumber)) {
await rimraf(portOrPath)
return { path: portOrPath }
}
return { port: portNumber, host: '0.0.0.0' }
}

const machineStatusCommand = process.env.MACHINE_STATUS_COMMAND
? JSON.parse(process.env.MACHINE_STATUS_COMMAND) as MachineStatusCommand
Expand Down Expand Up @@ -99,16 +103,10 @@ const main = async () => {
void dockerClient.startListening({
onChange: async services => {
currentTunnels = sshClient.updateTunnels(services)
void currentTunnels.then(ssh => writeLineToStdout(JSON.stringify(ssh)))
},
})

const apiListenAddress = process.env.PORT ?? COMPOSE_TUNNEL_AGENT_PORT
if (typeof apiListenAddress === 'string' && Number.isNaN(Number(apiListenAddress))) {
await rimraf(apiListenAddress)
}

const { handler, upgradeHandler } = createApiServerHandler({
const app = await createApp({
log: log.child({ name: 'api' }),
currentSshState: async () => (await currentTunnels),
machineStatus: machineStatusCommand
Expand All @@ -120,24 +118,8 @@ const main = async () => {
dockerFilter: dockerFilteredClient({ docker, composeProject: targetComposeProject }),
})

const httpLog = log.child({ name: 'http' })

const httpServer = http.createServer(tryHandler({ log: httpLog }, async (req, res) => {
httpLog.debug('request %s %s', req.method, req.url)
return await handler(req, res)
}))
.on('upgrade', tryUpgradeHandler({ log: httpLog }, async (req, socket, head) => {
httpLog.debug('upgrade %s %s', req.method, req.url)
return await upgradeHandler(req, socket, head)
}))
.listen(apiListenAddress, () => {
httpLog.info(`API server listening on ${inspect(httpServer.address())}`)
})
.on('error', err => {
httpLog.error(err)
process.exit(1)
})
.unref()
void app.listen({ ...await fastifyListenArgsFromEnv() })
app.server.unref()
}

void main();
Expand Down
6 changes: 5 additions & 1 deletion packages/compose-tunnel-agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@
},
"license": "Apache-2.0",
"dependencies": {
"@fastify/cors": "^8.3.0",
"@fastify/request-context": "^5.0.0",
"@fastify/websocket": "^8.2.0",
"@preevy/common": "0.0.50",
"dockerode": "^3.3.4",
"fastify": "^4.22.2",
"fastify-type-provider-zod": "^1.1.9",
"http-proxy": "^1.18.1",
"lodash": "^4.17.21",
"p-limit": "^3.1.0",
"pino": "^8.11.0",
"pino-pretty": "^9.4.0",
"rimraf": "^5.0.0",
"ssh2": "^1.12.0",
"ws": "^8.13.0"
"ws": "^8.13.0",
"zod": "^3.21.4"
},
"devDependencies": {
"@jest/globals": "^29.5.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { NotFoundError } from '../http-errors'

export class ContainerNotFoundError extends NotFoundError {
constructor(containerId: string) {
super(`Container "${containerId}" does not exist or is not managed by this agent`)
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import { inspect } from 'util'
import { createWebSocketStream } from 'ws'
import { parseQueryParams, queryParamBoolean } from '../../query-params'
import { wsHandler } from '../handler'
import { NotFoundError } from '../../http-server-helpers'

const handler = wsHandler(
/^\/container\/([^/?]+)\/exec($|\?)/,
async (ws, req, match, { log, docker, dockerFilter }) => {
const containerId = match[1]
if (!await dockerFilter.inspectContainer(containerId)) {
throw new NotFoundError()
}
const { obj: { tty: ttyQueryParam }, search } = parseQueryParams(req.url ?? '', { tty: true })
const cmdQueryParams = search.getAll('cmd')
const cmd = cmdQueryParams.length ? cmdQueryParams : ['sh']
import z from 'zod'
import { FastifyPluginAsync } from 'fastify'
import Dockerode from 'dockerode'
import { DockerFilterClient } from '../../docker'
import { containerIdSchema, execQueryString } from './schema'
import { inspectFilteredContainer } from './filter'

const tty = queryParamBoolean(ttyQueryParam)
const handler: FastifyPluginAsync<{
docker: Dockerode
dockerFilter: DockerFilterClient
}> = async (app, { docker, dockerFilter }) => {
app.get<{
Params: z.infer<typeof containerIdSchema>
Querystring: z.infer<typeof execQueryString>
}>('/:containerId/exec', {
schema: {
params: containerIdSchema,
querystring: execQueryString,
},
websocket: true,
}, async (connection, { params: { containerId }, query: { tty, cmd }, log }) => {
await inspectFilteredContainer(dockerFilter, containerId)
const abort = new AbortController()
const exec = await docker.getContainer(containerId).exec({
AttachStdin: true,
Expand All @@ -32,17 +38,17 @@ const handler = wsHandler(
Tty: tty,
})

execStream.on('close', () => { ws.close() })
execStream.on('close', () => { connection.socket.close() })
execStream.on('error', err => { log.warn('execStream error %j', inspect(err)) })
ws.on('close', () => {
connection.socket.on('close', () => {
abort.abort()
execStream.destroy()
})

const inspectResults = await exec.inspect()
log.debug('exec %s: %j', containerId, inspect(inspectResults))

const wsStream = createWebSocketStream(ws)
const wsStream = createWebSocketStream(connection.socket)
wsStream.on('error', err => {
const level = err.message === 'aborted' || err.message.includes('WebSocket is not open') ? 'debug' : 'warn'
log[level]('wsStream error %j', inspect(err))
Expand All @@ -56,7 +62,7 @@ const handler = wsHandler(
}

return undefined
},
)
})
}

export default handler
10 changes: 10 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { DockerFilterClient } from '../../docker'
import { ContainerNotFoundError } from './errors'

export const inspectFilteredContainer = async (dockerFilter: DockerFilterClient, containerId: string) => {
const container = await dockerFilter.inspectContainer(containerId)
if (!container) {
throw new ContainerNotFoundError(containerId)
}
return container
}
31 changes: 31 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import Dockerode from 'dockerode'
import { FastifyPluginAsync } from 'fastify'
import z from 'zod'
import fastifyWebsocket from '@fastify/websocket'
import { DockerFilterClient } from '../../docker'
import { containerIdSchema } from './schema'
import exec from './exec'
import logs from './logs'
import { inspectFilteredContainer } from './filter'

export const containers: FastifyPluginAsync<{
docker: Dockerode
dockerFilter: DockerFilterClient
}> = async (app, { docker, dockerFilter }) => {
app.get('/', async () => await dockerFilter.listContainers())

app.get<{
Params: z.infer<typeof containerIdSchema>
}>('/:containerId', {
schema: {
params: containerIdSchema,
},
}, async ({ params: { containerId } }, res) => {
const container = await inspectFilteredContainer(dockerFilter, containerId)
void res.send(container)
})

await app.register(fastifyWebsocket)
await app.register(exec, { docker, dockerFilter })
await app.register(logs, { docker, dockerFilter })
}
56 changes: 56 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/logs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { inspect } from 'util'
import { createWebSocketStream } from 'ws'
import z from 'zod'
import { FastifyPluginAsync } from 'fastify'
import Dockerode from 'dockerode'
import { DockerFilterClient } from '../../docker'
import { containerIdSchema, logsQueryString } from './schema'
import { inspectFilteredContainer } from './filter'

const handler: FastifyPluginAsync<{
docker: Dockerode
dockerFilter: DockerFilterClient
}> = async (app, { docker, dockerFilter }) => {
app.get<{
Params: z.infer<typeof containerIdSchema>
Querystring: z.infer<typeof logsQueryString>
}>('/:containerId/logs', {
schema: {
params: containerIdSchema,
querystring: logsQueryString,
},
websocket: true,
}, async (
connection,
{ params: { containerId }, query: { stdout, stderr, since, until, timestamps, tail }, log }
) => {
await inspectFilteredContainer(dockerFilter, containerId)
const abort = new AbortController()
const logStream = await docker.getContainer(containerId).logs({
stdout,
stderr,
since,
until,
timestamps,
tail,
follow: true,
abortSignal: abort.signal,
})

logStream.on('close', async () => { connection.socket.close() })
logStream.on('error', err => {
if (err.message !== 'aborted') {
log.error('logs stream error %j', inspect(err))
}
})
connection.socket.on('close', () => { abort.abort() })

const wsStream = createWebSocketStream(connection.socket)
wsStream.on('error', err => { log.error('wsStream error %j', inspect(err)) })
docker.modem.demuxStream(logStream, wsStream, wsStream)

return undefined
})
}

export default handler
19 changes: 19 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import z from 'zod'

export const containerIdSchema = z.object({
containerId: z.string(),
})

export const execQueryString = z.object({
cmd: z.array(z.string()).optional().default(['sh']),
tty: z.coerce.boolean().optional().default(true),
})

export const logsQueryString = z.object({
stdout: z.coerce.boolean().optional(),
stderr: z.coerce.boolean().optional(),
since: z.string().optional(),
until: z.string().optional(),
timestamps: z.coerce.boolean().optional(),
tail: z.coerce.number().optional(),
})
32 changes: 32 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import fs from 'node:fs'
import { FastifyPluginAsync } from 'fastify'
import { SshState } from '../ssh'

export const env: FastifyPluginAsync<{
currentSshState: () => Promise<SshState>
machineStatus?: () => Promise<{ data: Buffer; contentType: string }>
envMetadata?: Record<string, unknown>
composeModelPath: string
}> = async (app, { currentSshState, machineStatus, envMetadata, composeModelPath }) => {
app.get('/tunnels', async () => await currentSshState())

if (machineStatus) {
app.get('/machine-status', async (_req, res) => {
const { data, contentType } = await machineStatus()
void res
.header('Content-Type', contentType)
.send(data)
})
}

if (envMetadata) {
app.get('/env-metadata', async () => envMetadata)
}

app.get('/compose-model', async ({ log }, res) => {
log.debug('compose-model handler')
void res
.header('Content-Type', 'application/x-yaml')
.send(await fs.promises.readFile(composeModelPath, { encoding: 'utf-8' }))
})

Check failure

Code scanning / CodeQL

Missing rate limiting High

This route handler performs
a file system access
, but is not rate-limited.
}
13 changes: 0 additions & 13 deletions packages/compose-tunnel-agent/src/api-server/errors.ts

This file was deleted.

Loading

0 comments on commit e7a2fb1

Please sign in to comment.