Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cta: narrow down docker API #209

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/cli/src/commands/logs.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import yaml from 'yaml'
import { Args, Flags, Interfaces } from '@oclif/core'
import {
COMPOSE_TUNNEL_AGENT_SERVICE_NAME, addBaseComposeTunnelAgentService,
addBaseComposeTunnelAgentService,
localComposeClient, wrapWithDockerSocket, findEnvId, MachineConnection, ComposeModel, remoteUserModel,
} from '@preevy/core'
import { COMPOSE_TUNNEL_AGENT_SERVICE_NAME } from '@preevy/common'
import DriverCommand from '../driver-command'
import { envIdFlags } from '../common-flags'

Expand Down
1 change: 1 addition & 0 deletions packages/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ export { Logger } from './src/log'
export { requiredEnv, numberFromEnv } from './src/env'
export { tunnelNameResolver, TunnelNameResolver } from './src/tunnel-name'
export { editUrl } from './src/url'
export * from './src/compose-tunnel-agent'
export { MachineStatusCommand, DockerMachineStatusCommandRecipe } from './src/machine-status-command'
export { ProcessOutputBuffers, orderedOutput, OrderedOutput } from './src/process-output-buffers'
10 changes: 10 additions & 0 deletions packages/common/src/compose-tunnel-agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const COMPOSE_TUNNEL_AGENT_SERVICE_LABELS = {
PROFILE_THUMBPRINT: 'preevy.profile_thumbprint',
PRIVATE_MODE: 'preevy.private_mode',
ENV_ID: 'preevy.env_id',
ACCESS: 'preevy.access',
EXPOSE: 'preevy.expose',
}

export const COMPOSE_TUNNEL_AGENT_SERVICE_NAME = 'preevy_proxy'
export const COMPOSE_TUNNEL_AGENT_PORT = 3000
85 changes: 34 additions & 51 deletions packages/compose-tunnel-agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
import fs from 'fs'
import path from 'path'
import Docker from 'dockerode'
import { inspect } from 'node:util'
import http from 'node:http'
import { rimraf } from 'rimraf'
import pino from 'pino'
import pinoPretty from 'pino-pretty'
import { EOL } from 'os'
import {
requiredEnv,
formatPublicKey,
parseSshUrl,
SshConnectionConfig,
tunnelNameResolver,
MachineStatusCommand,
COMPOSE_TUNNEL_AGENT_PORT,
} from '@preevy/common'
import createDockerClient from './src/docker'
import createApiServerHandler from './src/http/api-server'
import { createApp } from './src/api-server'
import { sshClient as createSshClient } from './src/ssh'
import { createDockerProxyHandlers } from './src/http/docker-proxy'
import { tryHandler, tryUpgradeHandler } from './src/http/http-server-helpers'
import { httpServerHandlers } from './src/http'
import { runMachineStatusCommand } from './src/machine-status'
import { envMetadata } from './src/metadata'
import { readAllFiles } from './src/files'
import { eventsClient as dockerEventsClient, filteredClient as dockerFilteredClient } from './src/docker'

const homeDir = process.env.HOME || '/root'
const dockerSocket = '/var/run/docker.sock'

const targetComposeProject = process.env.COMPOSE_PROJECT
const defaultAccess = process.env.DEFAULT_ACCESS_LEVEL === 'private' ? 'private' : 'public'

const sshConnectionConfigFromEnv = async (): Promise<{ connectionConfig: SshConnectionConfig; sshUrl: string }> => {
const sshUrl = requiredEnv('SSH_URL')
const parsed = parseSshUrl(sshUrl)
Expand All @@ -52,7 +50,15 @@ const sshConnectionConfigFromEnv = async (): Promise<{ connectionConfig: SshConn
}
}

const writeLineToStdout = (s: string) => [s, EOL].forEach(d => process.stdout.write(d))
const fastifyListenArgsFromEnv = async () => {
const portOrPath = process.env.PORT ?? COMPOSE_TUNNEL_AGENT_PORT
const portNumber = Number(portOrPath)
if (typeof portOrPath === 'string' && Number.isNaN(portNumber)) {
await rimraf(portOrPath)
return { path: portOrPath }
}
return { port: portNumber, host: '0.0.0.0' }
}

const machineStatusCommand = process.env.MACHINE_STATUS_COMMAND
? JSON.parse(process.env.MACHINE_STATUS_COMMAND) as MachineStatusCommand
Expand All @@ -72,7 +78,13 @@ const main = async () => {
})

const docker = new Docker({ socketPath: dockerSocket })
const dockerClient = createDockerClient({ log: log.child({ name: 'docker' }), docker, debounceWait: 500 })
const dockerClient = dockerEventsClient({
log: log.child({ name: 'docker' }),
docker,
debounceWait: 500,
defaultAccess,
composeProject: targetComposeProject,
})

const sshLog = log.child({ name: 'ssh' })
const sshClient = await createSshClient({
Expand All @@ -91,52 +103,23 @@ const main = async () => {
void dockerClient.startListening({
onChange: async services => {
currentTunnels = sshClient.updateTunnels(services)
void currentTunnels.then(ssh => writeLineToStdout(JSON.stringify(ssh)))
},
})

const apiListenAddress = process.env.PORT ?? 3000
if (typeof apiListenAddress === 'string' && Number.isNaN(Number(apiListenAddress))) {
await rimraf(apiListenAddress)
}

const { handler, upgradeHandler } = httpServerHandlers({
log: log.child({ name: 'http' }),
apiHandler: createApiServerHandler({
log: log.child({ name: 'api' }),
currentSshState: async () => (await currentTunnels),
machineStatus: machineStatusCommand
? async () => await runMachineStatusCommand({ log, docker })(machineStatusCommand)
: undefined,
envMetadata: await envMetadata({ env: process.env, log }),
composeModelPath: '/preevy/docker-compose.yaml',
}),
dockerProxyHandlers: createDockerProxyHandlers({
log: log.child({ name: 'docker-proxy' }),
dockerSocket,
docker,
}),
dockerProxyPrefix: '/docker/',
const app = await createApp({
log: log.child({ name: 'api' }),
currentSshState: async () => (await currentTunnels),
machineStatus: machineStatusCommand
? async () => await runMachineStatusCommand({ log, docker })(machineStatusCommand)
: undefined,
envMetadata: await envMetadata({ env: process.env, log }),
composeModelPath: '/preevy/docker-compose.yaml',
docker,
dockerFilter: dockerFilteredClient({ docker, composeProject: targetComposeProject }),
})

const httpLog = log.child({ name: 'http' })

const httpServer = http.createServer(tryHandler({ log: httpLog }, async (req, res) => {
httpLog.debug('request %s %s', req.method, req.url)
return await handler(req, res)
}))
.on('upgrade', tryUpgradeHandler({ log: httpLog }, async (req, socket, head) => {
httpLog.debug('upgrade %s %s', req.method, req.url)
return await upgradeHandler(req, socket, head)
}))
.listen(apiListenAddress, () => {
httpLog.info(`API server listening on ${inspect(httpServer.address())}`)
})
.on('error', err => {
httpLog.error(err)
process.exit(1)
})
.unref()
void app.listen({ ...await fastifyListenArgsFromEnv() })
app.server.unref()
}

void main();
Expand Down
8 changes: 7 additions & 1 deletion packages/compose-tunnel-agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@
},
"license": "Apache-2.0",
"dependencies": {
"@fastify/cors": "^8.3.0",
"@fastify/request-context": "^5.0.0",
"@fastify/websocket": "^8.2.0",
"@preevy/common": "0.0.50",
"dockerode": "^3.3.4",
"fastify": "^4.22.2",
"fastify-type-provider-zod": "^1.1.9",
"http-proxy": "^1.18.1",
"lodash": "^4.17.21",
"p-limit": "^3.1.0",
"pino": "^8.11.0",
"pino-pretty": "^9.4.0",
"rimraf": "^5.0.0",
"ssh2": "^1.12.0",
"ws": "^8.13.0"
"ws": "^8.13.0",
"zod": "^3.21.4"
},
"devDependencies": {
"@jest/globals": "^29.5.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { NotFoundError } from '../http-errors'

export class ContainerNotFoundError extends NotFoundError {
constructor(containerId: string) {
super(`Container "${containerId}" does not exist or is not managed by this agent`)
}
}
68 changes: 68 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/exec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { inspect } from 'util'
import { createWebSocketStream } from 'ws'
import z from 'zod'
import { FastifyPluginAsync } from 'fastify'
import Dockerode from 'dockerode'
import { DockerFilterClient } from '../../docker'
import { containerIdSchema, execQueryString } from './schema'
import { inspectFilteredContainer } from './filter'

const handler: FastifyPluginAsync<{
docker: Dockerode
dockerFilter: DockerFilterClient
}> = async (app, { docker, dockerFilter }) => {
app.get<{
Params: z.infer<typeof containerIdSchema>
Querystring: z.infer<typeof execQueryString>
}>('/:containerId/exec', {
schema: {
params: containerIdSchema,
querystring: execQueryString,
},
websocket: true,
}, async (connection, { params: { containerId }, query: { tty, cmd }, log }) => {
await inspectFilteredContainer(dockerFilter, containerId)
const abort = new AbortController()
const exec = await docker.getContainer(containerId).exec({
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
Cmd: cmd,
Tty: tty,
abortSignal: abort.signal,
})

const execStream = await exec.start({
hijack: true,
stdin: true,
Tty: tty,
})

execStream.on('close', () => { connection.socket.close() })
execStream.on('error', err => { log.warn('execStream error %j', inspect(err)) })
connection.socket.on('close', () => {
abort.abort()
execStream.destroy()
})

const inspectResults = await exec.inspect()
log.debug('exec %s: %j', containerId, inspect(inspectResults))

const wsStream = createWebSocketStream(connection.socket)
wsStream.on('error', err => {
const level = err.message === 'aborted' || err.message.includes('WebSocket is not open') ? 'debug' : 'warn'
log[level]('wsStream error %j', inspect(err))
})

if (tty) {
execStream.pipe(wsStream, { end: false }).pipe(execStream)
} else {
docker.modem.demuxStream(execStream, wsStream, wsStream)
wsStream.pipe(execStream)
}

return undefined
})
}

export default handler
10 changes: 10 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { DockerFilterClient } from '../../docker'
import { ContainerNotFoundError } from './errors'

export const inspectFilteredContainer = async (dockerFilter: DockerFilterClient, containerId: string) => {
const container = await dockerFilter.inspectContainer(containerId)
if (!container) {
throw new ContainerNotFoundError(containerId)
}
return container
}
31 changes: 31 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import Dockerode from 'dockerode'
import { FastifyPluginAsync } from 'fastify'
import z from 'zod'
import fastifyWebsocket from '@fastify/websocket'
import { DockerFilterClient } from '../../docker'
import { containerIdSchema } from './schema'
import exec from './exec'
import logs from './logs'
import { inspectFilteredContainer } from './filter'

export const containers: FastifyPluginAsync<{
docker: Dockerode
dockerFilter: DockerFilterClient
}> = async (app, { docker, dockerFilter }) => {
app.get('/', async () => await dockerFilter.listContainers())

app.get<{
Params: z.infer<typeof containerIdSchema>
}>('/:containerId', {
schema: {
params: containerIdSchema,
},
}, async ({ params: { containerId } }, res) => {
const container = await inspectFilteredContainer(dockerFilter, containerId)
void res.send(container)
})

await app.register(fastifyWebsocket)
await app.register(exec, { docker, dockerFilter })
await app.register(logs, { docker, dockerFilter })
}
56 changes: 56 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/logs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { inspect } from 'util'
import { createWebSocketStream } from 'ws'
import z from 'zod'
import { FastifyPluginAsync } from 'fastify'
import Dockerode from 'dockerode'
import { DockerFilterClient } from '../../docker'
import { containerIdSchema, logsQueryString } from './schema'
import { inspectFilteredContainer } from './filter'

const handler: FastifyPluginAsync<{
docker: Dockerode
dockerFilter: DockerFilterClient
}> = async (app, { docker, dockerFilter }) => {
app.get<{
Params: z.infer<typeof containerIdSchema>
Querystring: z.infer<typeof logsQueryString>
}>('/:containerId/logs', {
schema: {
params: containerIdSchema,
querystring: logsQueryString,
},
websocket: true,
}, async (
connection,
{ params: { containerId }, query: { stdout, stderr, since, until, timestamps, tail }, log }
) => {
await inspectFilteredContainer(dockerFilter, containerId)
const abort = new AbortController()
const logStream = await docker.getContainer(containerId).logs({
stdout,
stderr,
since,
until,
timestamps,
tail,
follow: true,
abortSignal: abort.signal,
})

logStream.on('close', async () => { connection.socket.close() })
logStream.on('error', err => {
if (err.message !== 'aborted') {
log.error('logs stream error %j', inspect(err))
}
})
connection.socket.on('close', () => { abort.abort() })

const wsStream = createWebSocketStream(connection.socket)
wsStream.on('error', err => { log.error('wsStream error %j', inspect(err)) })
docker.modem.demuxStream(logStream, wsStream, wsStream)

return undefined
})
}

export default handler
19 changes: 19 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/containers/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import z from 'zod'

export const containerIdSchema = z.object({
containerId: z.string(),
})

export const execQueryString = z.object({
cmd: z.array(z.string()).optional().default(['sh']),
tty: z.coerce.boolean().optional().default(true),
})

export const logsQueryString = z.object({
stdout: z.coerce.boolean().optional(),
stderr: z.coerce.boolean().optional(),
since: z.string().optional(),
until: z.string().optional(),
timestamps: z.coerce.boolean().optional(),
tail: z.coerce.number().optional(),
})
Loading