Skip to content

Commit

Permalink
cta: narrow down docker API
Browse files Browse the repository at this point in the history
replace generic docker API proxy with specific endpoints
  • Loading branch information
Roy Razon committed Aug 30, 2023
1 parent e0c2173 commit 68b5d29
Show file tree
Hide file tree
Showing 30 changed files with 473 additions and 384 deletions.
3 changes: 2 additions & 1 deletion packages/cli/src/commands/logs.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import yaml from 'yaml'
import { Args, Flags, Interfaces } from '@oclif/core'
import {
COMPOSE_TUNNEL_AGENT_SERVICE_NAME, addBaseComposeTunnelAgentService,
addBaseComposeTunnelAgentService,
localComposeClient, wrapWithDockerSocket, findEnvId, MachineConnection, ComposeModel, remoteUserModel,
} from '@preevy/core'
import { COMPOSE_TUNNEL_AGENT_SERVICE_NAME } from '@preevy/common'
import DriverCommand from '../driver-command'
import { envIdFlags } from '../common-flags'

Expand Down
1 change: 1 addition & 0 deletions packages/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ export { Logger } from './src/log'
export { requiredEnv, numberFromEnv } from './src/env'
export { tunnelNameResolver, TunnelNameResolver } from './src/tunnel-name'
export { editUrl } from './src/url'
export * from './src/compose-tunnel-agent'
export { MachineStatusCommand, DockerMachineStatusCommandRecipe } from './src/machine-status-command'
export { ProcessOutputBuffers, orderedOutput, OrderedOutput } from './src/process-output-buffers'
10 changes: 10 additions & 0 deletions packages/common/src/compose-tunnel-agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const COMPOSE_TUNNEL_AGENT_SERVICE_LABELS = {
PROFILE_THUMBPRINT: 'preevy.profile_thumbprint',
PRIVATE_MODE: 'preevy.private_mode',
ENV_ID: 'preevy.env_id',
ACCESS: 'preevy.access',
EXPOSE: 'preevy.expose',
}

export const COMPOSE_TUNNEL_AGENT_SERVICE_NAME = 'preevy_proxy'
export const COMPOSE_TUNNEL_AGENT_PORT = 3000
49 changes: 25 additions & 24 deletions packages/compose-tunnel-agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ import {
SshConnectionConfig,
tunnelNameResolver,
MachineStatusCommand,
COMPOSE_TUNNEL_AGENT_PORT,
} from '@preevy/common'
import createDockerClient from './src/docker'
import createApiServerHandler from './src/http/api-server'
import createApiServerHandler from './src/api-server'
import { sshClient as createSshClient } from './src/ssh'
import { createDockerProxyHandlers } from './src/http/docker-proxy'
import { tryHandler, tryUpgradeHandler } from './src/http/http-server-helpers'
import { httpServerHandlers } from './src/http'
import { tryHandler, tryUpgradeHandler } from './src/api-server/http-server-helpers'
import { runMachineStatusCommand } from './src/machine-status'
import { envMetadata } from './src/metadata'
import { readAllFiles } from './src/files'
import { eventsClient as dockerEventsClient, filteredClient as dockerFilteredClient } from './src/docker'

const homeDir = process.env.HOME || '/root'
const dockerSocket = '/var/run/docker.sock'

const targetComposeProject = process.env.COMPOSE_PROJECT
const defaultAccess = process.env.DEFAULT_ACCESS_LEVEL === 'private' ? 'private' : 'public'

const sshConnectionConfigFromEnv = async (): Promise<{ connectionConfig: SshConnectionConfig; sshUrl: string }> => {
const sshUrl = requiredEnv('SSH_URL')
const parsed = parseSshUrl(sshUrl)
Expand Down Expand Up @@ -72,7 +74,13 @@ const main = async () => {
})

const docker = new Docker({ socketPath: dockerSocket })
const dockerClient = createDockerClient({ log: log.child({ name: 'docker' }), docker, debounceWait: 500 })
const dockerClient = dockerEventsClient({
log: log.child({ name: 'docker' }),
docker,
debounceWait: 500,
defaultAccess,
composeProject: targetComposeProject,
})

const sshLog = log.child({ name: 'ssh' })
const sshClient = await createSshClient({
Expand All @@ -95,28 +103,21 @@ const main = async () => {
},
})

const apiListenAddress = process.env.PORT ?? 3000
const apiListenAddress = process.env.PORT ?? COMPOSE_TUNNEL_AGENT_PORT
if (typeof apiListenAddress === 'string' && Number.isNaN(Number(apiListenAddress))) {
await rimraf(apiListenAddress)
}

const { handler, upgradeHandler } = httpServerHandlers({
log: log.child({ name: 'http' }),
apiHandler: createApiServerHandler({
log: log.child({ name: 'api' }),
currentSshState: async () => (await currentTunnels),
machineStatus: machineStatusCommand
? async () => await runMachineStatusCommand({ log, docker })(machineStatusCommand)
: undefined,
envMetadata: await envMetadata({ env: process.env, log }),
composeModelPath: '/preevy/docker-compose.yaml',
}),
dockerProxyHandlers: createDockerProxyHandlers({
log: log.child({ name: 'docker-proxy' }),
dockerSocket,
docker,
}),
dockerProxyPrefix: '/docker/',
const { handler, upgradeHandler } = createApiServerHandler({
log: log.child({ name: 'api' }),
currentSshState: async () => (await currentTunnels),
machineStatus: machineStatusCommand
? async () => await runMachineStatusCommand({ log, docker })(machineStatusCommand)
: undefined,
envMetadata: await envMetadata({ env: process.env, log }),
composeModelPath: '/preevy/docker-compose.yaml',
docker,
dockerFilter: dockerFilteredClient({ docker, composeProject: targetComposeProject }),
})

const httpLog = log.child({ name: 'http' })
Expand Down
13 changes: 13 additions & 0 deletions packages/compose-tunnel-agent/src/api-server/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { BadRequestError, NotFoundError } from './http-server-helpers'

export class MissingContainerIdError extends BadRequestError {
constructor() {
super('Missing container id')
}
}

export class ContainerNotFoundError extends NotFoundError {
constructor(containerId: string) {
super(`Container "${containerId}" does not exist or is not managed by this agent`)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { Logger } from '@preevy/common'
import http from 'node:http'
import stream from 'node:stream'
import { inspect } from 'node:util'
import { WebSocket } from 'ws'

export const respond = (res: http.ServerResponse, content: string, type = 'text/plain', status = 200) => {
export const respond = (res: http.ServerResponse, content: string | Buffer, type = 'text/plain', status = 200) => {
res.writeHead(status, { 'Content-Type': type })
res.end(content)
}
Expand Down Expand Up @@ -105,13 +106,43 @@ export const errorUpgradeHandler = (
log.warn('caught error: %j in upgrade %s %s', inspect(err), req.method || '', req.url || '')
}

export type UpgradeHandler = (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => Promise<void>

export const tryUpgradeHandler = (
{ log }: { log: Logger },
f: (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => Promise<void>
f: UpgradeHandler,
) => async (req: http.IncomingMessage, socket: stream.Duplex, head: Buffer) => {
try {
await f(req, socket, head)
} catch (err) {
errorUpgradeHandler(log, err, req, socket)
}
}

export const errorWsHandler = (
log: Logger,
err: unknown,
ws: WebSocket,
req: http.IncomingMessage,
) => {
const [code, message]: [number, string] = err instanceof HttpError
? [err.status, err.clientMessage]
: [500, InternalError.defaultMessage]

const wsCode = 4000 + code // https://github.com/websockets/ws/issues/715#issuecomment-504702511
ws.close(wsCode, message)
log.warn('caught error: %j in ws %s %s', inspect(err), req.method || '', req.url || '')
}

export type WsHandler = (ws: WebSocket, req: http.IncomingMessage) => Promise<void>

export const tryWsHandler = (
{ log }: { log: Logger },
f: WsHandler,
) => async (ws: WebSocket, req: http.IncomingMessage) => {
try {
await f(ws, req)
} catch (err) {
errorWsHandler(log, err, ws, req)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import http from 'node:http'
import net from 'node:net'
import { describe, expect, beforeAll, afterAll, test, jest, it } from '@jest/globals'
import { describe, expect, beforeAll, afterAll, jest, it } from '@jest/globals'
import { ChildProcess, spawn, exec } from 'child_process'
import pino from 'pino'
import pinoPretty from 'pino-pretty'
Expand All @@ -10,21 +10,26 @@ import { inspect, promisify } from 'node:util'
import waitForExpect from 'wait-for-expect'
import WebSocket from 'ws'
import stripAnsi from 'strip-ansi'
import { createDockerProxyHandlers } from '.'
import createApiServerHandlers from '.'
import { filteredClient } from '../docker'
import { SshState } from '../ssh'
import { COMPOSE_PROJECT_LABEL } from '../docker/labels'

const TEST_COMPOSE_PROJECT = 'my-project'

const setupDockerContainer = () => {
let dockerProcess: ChildProcess
let containerName: string
let output: Buffer[]
jest.setTimeout(100000)
jest.setTimeout(20000)

beforeAll(() => {
containerName = `test-docker-proxy-${Math.random().toString(36).substring(2, 9)}`
output = []
dockerProcess = spawn(
'docker',
[
...`run --rm --name ${containerName} busybox sh -c`.split(' '),
...`run --rm --name ${containerName} --label ${COMPOSE_PROJECT_LABEL}=${TEST_COMPOSE_PROJECT} busybox sh -c`.split(' '),
'while true; do echo "hello stdout"; >&2 echo "hello stderr"; sleep 0.1; done',
]
)
Expand All @@ -50,7 +55,7 @@ const setupDockerContainer = () => {
}
}

const setupDockerProxy = () => {
const setupApiServer = () => {
const log = pino({
level: 'debug',
}, pinoPretty({ destination: pino.destination(process.stderr) }))
Expand All @@ -60,7 +65,13 @@ const setupDockerProxy = () => {

beforeAll(async () => {
const docker = new Dockerode()
const handlers = createDockerProxyHandlers({ log, docker, dockerSocket: '/var/run/docker.sock' })
const handlers = createApiServerHandlers({
log,
docker,
dockerFilter: filteredClient({ docker, composeProject: TEST_COMPOSE_PROJECT }),
composeModelPath: '',
currentSshState: () => Promise.resolve({} as unknown as SshState),
})
server = http.createServer(handlers.handler).on('upgrade', handlers.upgradeHandler)

const serverPort = await new Promise<number>(resolve => {
Expand Down Expand Up @@ -121,58 +132,31 @@ const openWebSocket = (url: string) => new Promise<OpenWebSocket>((resolve, reje
})
})

describe('docker proxy', () => {
describe('docker api', () => {
const { containerName } = setupDockerContainer()
const { serverBaseUrl } = setupDockerProxy()
const { serverBaseUrl } = setupApiServer()

const waitForContainerId = async () => {
let containerId = ''
await waitForExpect(async () => {
const containers = await fetchJson(`http://${serverBaseUrl()}/containers/json`) as { Id: string; Names: string[] }[]
const containers = await fetchJson(`http://${serverBaseUrl()}/containers`) as { Id: string; Names: string[] }[]
const container = containers.find(({ Names: names }) => names.includes(`/${containerName()}`))
expect(container).toBeDefined()
containerId = container?.Id as string
}, 3000, 100)
return containerId
}

test('use the docker API', async () => {
expect(await waitForContainerId()).toBeDefined()
})

describe('exec', () => {
const createExec = async (containerId: string, tty: boolean) => {
const { Id: execId } = await fetchJson(`http://${serverBaseUrl()}/containers/${containerId}/exec`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
Tty: tty,
Cmd: ['sh'],
}),
})

return execId
}

let execId: string
let containerId: string

beforeAll(async () => {
containerId = await waitForContainerId()
})

describe('tty=true', () => {
beforeAll(async () => {
execId = await createExec(containerId, true)
})

it('should communicate via websocket', async () => {
const { receivedBuffers, send, close } = await openWebSocket(`ws://${serverBaseUrl()}/exec/${execId}/start`)
const { receivedBuffers, send, close } = await openWebSocket(`ws://${serverBaseUrl()}/container/${containerId}/exec`)
await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0))
await send('ls \n')
await waitForExpect(() => {
Expand All @@ -186,12 +170,8 @@ describe('docker proxy', () => {
})

describe('tty=false', () => {
beforeAll(async () => {
execId = await createExec(containerId, false)
})

it('should communicate via websocket', async () => {
const { receivedBuffers, send, close } = await openWebSocket(`ws://${serverBaseUrl()}/exec/${execId}/start`)
const { receivedBuffers, send, close } = await openWebSocket(`ws://${serverBaseUrl()}/container/${containerId}/exec`)
await waitForExpect(async () => {
await send('ls\n')
const received = Buffer.concat(receivedBuffers).toString('utf-8')
Expand All @@ -214,7 +194,7 @@ describe('docker proxy', () => {
const testStream = (...s: LogStream[]) => {
describe(`${s.join(' and ')}`, () => {
it(`should show the ${s.join(' and ')} logs via websocket`, async () => {
const { receivedBuffers, close } = await openWebSocket(`ws://${serverBaseUrl()}/containers/${containerId}/logs?${s.map(st => `${st}=true`).join('&')}`)
const { receivedBuffers, close } = await openWebSocket(`ws://${serverBaseUrl()}/container/${containerId}/logs?${s.map(st => `${st}=true`).join('&')}`)
await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0))
const length1 = receivedBuffers.length
await waitForExpect(() => {
Expand All @@ -240,7 +220,7 @@ describe('docker proxy', () => {

describe('timestamps', () => {
it('should show the logs with a timestamp', async () => {
const { receivedBuffers, close } = await openWebSocket(`ws://${serverBaseUrl()}/containers/${containerId}/logs?stdout=true&timestamps=true`)
const { receivedBuffers, close } = await openWebSocket(`ws://${serverBaseUrl()}/container/${containerId}/logs?stdout=true&timestamps=true`)
await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0))
const received = Buffer.concat(receivedBuffers).toString('utf-8')
expect(received).toMatch(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d*Z/)
Expand Down
Loading

0 comments on commit 68b5d29

Please sign in to comment.