From 79075996cc418cd028992621cdeebaf3775ec1c2 Mon Sep 17 00:00:00 2001 From: Roy Razon Date: Sun, 28 Jan 2024 10:13:51 +0200 Subject: [PATCH] kube driver: release sockets used by kube API (#396) attempt to fix hanging process on exit --- .../src/driver/client/index.ts | 12 ++++++--- .../src/driver/client/port-forward.ts | 27 +++++++------------ 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/packages/driver-kube-pod/src/driver/client/index.ts b/packages/driver-kube-pod/src/driver/client/index.ts index 1427d26a..6bb06773 100644 --- a/packages/driver-kube-pod/src/driver/client/index.ts +++ b/packages/driver-kube-pod/src/driver/client/index.ts @@ -70,6 +70,10 @@ const ensureSingleDockerHostStatefulSet = ( } } +const noForever: k8s.Interceptor = (opts => { opts.forever = false }) +type HasAddInterceptor = { addInterceptor(interceptor: k8s.Interceptor): void } +const addNoForeverInterceptor = (o: T) => { o.addInterceptor(noForever); return o } + export const kubeClient = ({ log, namespace, @@ -84,8 +88,8 @@ export const kubeClient = ({ profileId: string }) => { const wrap = logError(log) - const k8sApi = kc.makeApiClient(k8s.CoreV1Api) - const k8sAppsApi = kc.makeApiClient(k8s.AppsV1Api) + const k8sApi = addNoForeverInterceptor(kc.makeApiClient(k8s.CoreV1Api)) + const k8sAppsApi = addNoForeverInterceptor(kc.makeApiClient(k8s.AppsV1Api)) const podHelpers = createPodHelpers({ k8sApi, k8sAppsApi, wrap }) const appsV1ApiHelpers = createAppsV1ApiHelpers(k8sAppsApi, { wrap }) @@ -187,8 +191,8 @@ export const kubeCreationClient = ({ storageSize: number }) => { const wrap = logError(log) - const k8sAppsApi = kc.makeApiClient(k8s.AppsV1Api) - const k8sObjApi = kc.makeApiClient(k8s.KubernetesObjectApi) + const k8sAppsApi = addNoForeverInterceptor(kc.makeApiClient(k8s.AppsV1Api)) + const k8sObjApi = addNoForeverInterceptor(kc.makeApiClient(k8s.KubernetesObjectApi)) const watcher = new k8s.Watch(kc) const appsV1ApiHelpers = createAppsV1ApiHelpers(k8sAppsApi, { wrap }) diff --git a/packages/driver-kube-pod/src/driver/client/port-forward.ts b/packages/driver-kube-pod/src/driver/client/port-forward.ts index 9e37cb72..39c64f02 100644 --- a/packages/driver-kube-pod/src/driver/client/port-forward.ts +++ b/packages/driver-kube-pod/src/driver/client/port-forward.ts @@ -7,8 +7,6 @@ type ForwardSocket = AsyncDisposable & { localSocket: string | AddressInfo } -type Closable = { close: () => void } - const portForward = ( { namespace, forward, log }: { namespace: string; forward: k8s.PortForward; log: Logger }, ) => ( @@ -16,26 +14,19 @@ const portForward = ( targetPort: number, listenAddress: number | string | ListenOptions, ) => new Promise((resolve, reject) => { - const sockets = new Set() - const server = net.createServer(async socket => { + const sockets = new Set() + const server = net.createServer(socket => { socket.on('error', err => { log.debug('forward socket error', err) }) // prevent unhandled rejection - const forwardResult = await forward.portForward(namespace, podName, [targetPort], socket, null, socket, 10) + sockets.add(socket) + socket.unref() + socket.on('close', () => { sockets.delete(socket) }) + socket.on('end', () => { sockets.delete(socket) }) + + forward.portForward(namespace, podName, [targetPort], socket, null, socket, 10) .catch(err => { log.debug('forward api error', err) socket.emit('error', err) }) - - if (!forwardResult) { - return - } - - const ws = typeof forwardResult === 'function' ? forwardResult() : forwardResult - if (!ws) { - return - } - sockets.add(ws) - ws.on('close', () => { sockets.delete(ws) }) - ws.on('error', err => { log.debug('websocket error', err) }) // prevent unhandled rejection }) server.on('error', reject) @@ -46,7 +37,7 @@ const portForward = ( resolve({ localSocket: server.address() as string | AddressInfo, [Symbol.asyncDispose]: () => { - sockets.forEach(ws => ws.close()) + sockets.forEach(s => s.destroy()) return closeServer() }, })