Skip to content

Commit

Permalink
kube driver: release sockets used by kube API (#396)
Browse files Browse the repository at this point in the history
attempt to fix hanging process on exit
  • Loading branch information
Roy Razon authored Jan 28, 2024
1 parent 43914e4 commit 7907599
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
12 changes: 8 additions & 4 deletions packages/driver-kube-pod/src/driver/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const ensureSingleDockerHostStatefulSet = (
}
}

const noForever: k8s.Interceptor = (opts => { opts.forever = false })
type HasAddInterceptor = { addInterceptor(interceptor: k8s.Interceptor): void }
const addNoForeverInterceptor = <T extends HasAddInterceptor>(o: T) => { o.addInterceptor(noForever); return o }

export const kubeClient = ({
log,
namespace,
Expand All @@ -84,8 +88,8 @@ export const kubeClient = ({
profileId: string
}) => {
const wrap = logError(log)
const k8sApi = kc.makeApiClient(k8s.CoreV1Api)
const k8sAppsApi = kc.makeApiClient(k8s.AppsV1Api)
const k8sApi = addNoForeverInterceptor(kc.makeApiClient(k8s.CoreV1Api))
const k8sAppsApi = addNoForeverInterceptor(kc.makeApiClient(k8s.AppsV1Api))

const podHelpers = createPodHelpers({ k8sApi, k8sAppsApi, wrap })
const appsV1ApiHelpers = createAppsV1ApiHelpers(k8sAppsApi, { wrap })
Expand Down Expand Up @@ -187,8 +191,8 @@ export const kubeCreationClient = ({
storageSize: number
}) => {
const wrap = logError(log)
const k8sAppsApi = kc.makeApiClient(k8s.AppsV1Api)
const k8sObjApi = kc.makeApiClient(k8s.KubernetesObjectApi)
const k8sAppsApi = addNoForeverInterceptor(kc.makeApiClient(k8s.AppsV1Api))
const k8sObjApi = addNoForeverInterceptor(kc.makeApiClient(k8s.KubernetesObjectApi))
const watcher = new k8s.Watch(kc)

const appsV1ApiHelpers = createAppsV1ApiHelpers(k8sAppsApi, { wrap })
Expand Down
27 changes: 9 additions & 18 deletions packages/driver-kube-pod/src/driver/client/port-forward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,26 @@ type ForwardSocket = AsyncDisposable & {
localSocket: string | AddressInfo
}

type Closable = { close: () => void }

const portForward = (
{ namespace, forward, log }: { namespace: string; forward: k8s.PortForward; log: Logger },
) => (
podName: string,
targetPort: number,
listenAddress: number | string | ListenOptions,
) => new Promise<ForwardSocket>((resolve, reject) => {
const sockets = new Set<Closable>()
const server = net.createServer(async socket => {
const sockets = new Set<net.Socket>()
const server = net.createServer(socket => {
socket.on('error', err => { log.debug('forward socket error', err) }) // prevent unhandled rejection
const forwardResult = await forward.portForward(namespace, podName, [targetPort], socket, null, socket, 10)
sockets.add(socket)
socket.unref()
socket.on('close', () => { sockets.delete(socket) })
socket.on('end', () => { sockets.delete(socket) })

forward.portForward(namespace, podName, [targetPort], socket, null, socket, 10)
.catch(err => {
log.debug('forward api error', err)
socket.emit('error', err)
})

if (!forwardResult) {
return
}

const ws = typeof forwardResult === 'function' ? forwardResult() : forwardResult
if (!ws) {
return
}
sockets.add(ws)
ws.on('close', () => { sockets.delete(ws) })
ws.on('error', err => { log.debug('websocket error', err) }) // prevent unhandled rejection
})

server.on('error', reject)
Expand All @@ -46,7 +37,7 @@ const portForward = (
resolve({
localSocket: server.address() as string | AddressInfo,
[Symbol.asyncDispose]: () => {
sockets.forEach(ws => ws.close())
sockets.forEach(s => s.destroy())
return closeServer()
},
})
Expand Down

0 comments on commit 7907599

Please sign in to comment.