From ab9c34b01a037b13516c7e2904b931e6e35387fe Mon Sep 17 00:00:00 2001 From: Kautilya Tripathi Date: Mon, 25 Nov 2024 11:30:43 +0530 Subject: [PATCH] frontend: Websocket backward compatibility This adds a new way to use new way to run websocket multiplexer. Default way would be the legacy way which creates multiple websocket connection. This adds a new flag `REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER` to run the new API. Signed-off-by: Kautilya Tripathi --- frontend/package.json | 1 + frontend/src/helpers/index.ts | 9 + frontend/src/lib/k8s/api/v2/hooks.ts | 5 +- .../lib/k8s/api/v2/useKubeObjectList.test.tsx | 112 +++ .../src/lib/k8s/api/v2/useKubeObjectList.ts | 219 +++++- frontend/src/lib/k8s/api/v2/webSocket.test.ts | 382 ++++++++++ frontend/src/lib/k8s/api/v2/webSocket.ts | 675 ++++++++++++------ .../plugin/__snapshots__/pluginLib.snapshot | 2 +- 8 files changed, 1143 insertions(+), 262 deletions(-) create mode 100644 frontend/src/lib/k8s/api/v2/webSocket.test.ts diff --git a/frontend/package.json b/frontend/package.json index 55efa85ff4..50c28eb622 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -120,6 +120,7 @@ "build": "cross-env PUBLIC_URL=./ NODE_OPTIONS=--max-old-space-size=8096 vite build && npx shx rm -f build/frontend/index.baseUrl.html", "pretest": "npm run make-version", "test": "vitest", + "start-without-multiplexer": "cross-env REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER=false npm run start", "lint": "eslint --cache -c package.json --ext .js,.ts,.tsx src/ ../app/electron ../plugins/headlamp-plugin --ignore-pattern ../plugins/headlamp-plugin/template --ignore-pattern ../plugins/headlamp-plugin/lib/", "format": "prettier --config package.json --write --cache src ../app/electron ../app/tsconfig.json ../app/scripts ../plugins/headlamp-plugin/bin ../plugins/headlamp-plugin/config ../plugins/headlamp-plugin/template ../plugins/headlamp-plugin/test*.js ../plugins/headlamp-plugin/*.json ../plugins/headlamp-plugin/*.js", "format-check": "prettier --config package.json --check --cache src ../app/electron ../app/tsconfig.json ../app/scripts ../plugins/headlamp-plugin/bin ../plugins/headlamp-plugin/config ../plugins/headlamp-plugin/template ../plugins/headlamp-plugin/test*.js ../plugins/headlamp-plugin/*.json ../plugins/headlamp-plugin/*.js", diff --git a/frontend/src/helpers/index.ts b/frontend/src/helpers/index.ts index 40780c37e9..e3500745c7 100644 --- a/frontend/src/helpers/index.ts +++ b/frontend/src/helpers/index.ts @@ -352,6 +352,14 @@ function loadTableSettings(tableId: string): { id: string; show: boolean }[] { return settings; } +/** + * @returns true if the websocket multiplexer is enabled. + * defaults to true. This is a feature flag to enable the websocket multiplexer. + */ +export function getWebsocketMultiplexerEnabled(): boolean { + return import.meta.env.REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER !== 'false'; +} + /** * The backend token to use when making API calls from Headlamp when running as an app. * The app opens the index.html?backendToken=... and passes the token to the frontend @@ -393,6 +401,7 @@ const exportFunctions = { storeClusterSettings, loadClusterSettings, getHeadlampAPIHeaders, + getWebsocketMultiplexerEnabled, storeTableSettings, loadTableSettings, }; diff --git a/frontend/src/lib/k8s/api/v2/hooks.ts b/frontend/src/lib/k8s/api/v2/hooks.ts index 4dc082b56c..a88b0b30c9 100644 --- a/frontend/src/lib/k8s/api/v2/hooks.ts +++ b/frontend/src/lib/k8s/api/v2/hooks.ts @@ -4,6 +4,7 @@ import { getCluster } from '../../../cluster'; import { ApiError, QueryParameters } from '../../apiProxy'; import { KubeObject, KubeObjectInterface } from '../../KubeObject'; import { clusterFetch } from './fetch'; +import { KubeListUpdateEvent } from './KubeList'; import { KubeObjectEndpoint } from './KubeObjectEndpoint'; import { makeUrl } from './makeUrl'; import { useWebSocket } from './webSocket'; @@ -132,7 +133,7 @@ export function useKubeObject({ const data: Instance | null = query.error ? null : query.data ?? null; - useWebSocket({ + useWebSocket>({ url: () => makeUrl([KubeObjectEndpoint.toUrl(endpoint!)], { ...cleanedUpQueryParams, @@ -141,7 +142,7 @@ export function useKubeObject({ }), enabled: !!endpoint && !!data, cluster, - onMessage(update) { + onMessage(update: KubeListUpdateEvent) { if (update.type !== 'ADDED' && update.object) { client.setQueryData(queryKey, new kubeObjectClass(update.object)); } diff --git a/frontend/src/lib/k8s/api/v2/useKubeObjectList.test.tsx b/frontend/src/lib/k8s/api/v2/useKubeObjectList.test.tsx index 5b8957ad56..d6fe4bae5d 100644 --- a/frontend/src/lib/k8s/api/v2/useKubeObjectList.test.tsx +++ b/frontend/src/lib/k8s/api/v2/useKubeObjectList.test.tsx @@ -1,5 +1,6 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; import { renderHook } from '@testing-library/react'; +import { describe, expect, it, vi } from 'vitest'; import { kubeObjectListQuery, ListResponse, @@ -8,6 +9,18 @@ import { } from './useKubeObjectList'; import * as websocket from './webSocket'; +// Mock WebSocket functionality +const mockUseWebSockets = vi.fn(); +const mockSubscribe = vi.fn().mockImplementation(() => Promise.resolve(() => {})); + +vi.mock('./webSocket', () => ({ + useWebSockets: (...args: any[]) => mockUseWebSockets(...args), + WebSocketManager: { + subscribe: (...args: any[]) => mockSubscribe(...args), + }, + BASE_WS_URL: 'http://localhost:3000', +})); + describe('makeListRequests', () => { describe('for non namespaced resource', () => { it('should not include namespace in requests', () => { @@ -85,6 +98,11 @@ const mockClass = class { } as any; describe('useWatchKubeObjectLists', () => { + beforeEach(() => { + vi.stubEnv('REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER', 'false'); + vi.clearAllMocks(); + }); + it('should not be enabled when no endpoint is provided', () => { const spy = vi.spyOn(websocket, 'useWebSockets'); const queryClient = new QueryClient(); @@ -221,3 +239,97 @@ describe('useWatchKubeObjectLists', () => { ).toBe(objectB); }); }); + +describe('useWatchKubeObjectLists (Multiplexer)', () => { + beforeEach(() => { + vi.stubEnv('REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER', 'true'); + vi.clearAllMocks(); + }); + + it('should subscribe using WebSocketManager when multiplexer is enabled', () => { + const lists = [{ cluster: 'cluster-a', namespace: 'namespace-a', resourceVersion: '1' }]; + + renderHook( + () => + useWatchKubeObjectLists({ + kubeObjectClass: mockClass, + endpoint: { version: 'v1', resource: 'pods' }, + lists, + }), + { + wrapper: ({ children }) => ( + {children} + ), + } + ); + + expect(mockSubscribe).toHaveBeenCalledWith( + 'cluster-a', + expect.stringContaining('/api/v1/namespaces/namespace-a/pods'), + 'watch=1&resourceVersion=1', + expect.any(Function) + ); + }); + + it('should subscribe to multiple clusters', () => { + const lists = [ + { cluster: 'cluster-a', namespace: 'namespace-a', resourceVersion: '1' }, + { cluster: 'cluster-b', namespace: 'namespace-b', resourceVersion: '2' }, + ]; + + renderHook( + () => + useWatchKubeObjectLists({ + kubeObjectClass: mockClass, + endpoint: { version: 'v1', resource: 'pods' }, + lists, + }), + { + wrapper: ({ children }) => ( + {children} + ), + } + ); + + expect(mockSubscribe).toHaveBeenCalledTimes(2); + expect(mockSubscribe).toHaveBeenNthCalledWith( + 1, + 'cluster-a', + expect.stringContaining('/api/v1/namespaces/namespace-a/pods'), + 'watch=1&resourceVersion=1', + expect.any(Function) + ); + expect(mockSubscribe).toHaveBeenNthCalledWith( + 2, + 'cluster-b', + expect.stringContaining('/api/v1/namespaces/namespace-b/pods'), + 'watch=1&resourceVersion=2', + expect.any(Function) + ); + }); + + it('should handle non-namespaced resources', () => { + const lists = [{ cluster: 'cluster-a', resourceVersion: '1' }]; + + renderHook( + () => + useWatchKubeObjectLists({ + kubeObjectClass: mockClass, + endpoint: { version: 'v1', resource: 'pods' }, + lists, + }), + { + wrapper: ({ children }) => ( + {children} + ), + } + ); + + expect(mockSubscribe).toHaveBeenCalledWith( + 'cluster-a', + expect.stringContaining('/api/v1/pods'), + 'watch=1&resourceVersion=1', + expect.any(Function) + ); + }); +}); diff --git a/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts b/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts index 4efc8524bc..9de0703435 100644 --- a/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts +++ b/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts @@ -1,14 +1,15 @@ import { QueryObserverOptions, useQueries, useQueryClient } from '@tanstack/react-query'; -import { useEffect, useMemo, useRef, useState } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; +import { getWebsocketMultiplexerEnabled } from '../../../../helpers'; import { KubeObject, KubeObjectClass } from '../../KubeObject'; import { ApiError } from '../v1/clusterRequests'; import { QueryParameters } from '../v1/queryParameters'; import { clusterFetch } from './fetch'; import { QueryListResponse, useEndpoints } from './hooks'; -import { KubeList } from './KubeList'; +import { KubeList, KubeListUpdateEvent } from './KubeList'; import { KubeObjectEndpoint } from './KubeObjectEndpoint'; import { makeUrl } from './makeUrl'; -import { BASE_WS_URL, WebSocketManager } from './webSocket'; +import { BASE_WS_URL, useWebSockets, WebSocketManager } from './webSocket'; /** * Object representing a List of Kube object @@ -112,26 +113,75 @@ export function useWatchKubeObjectLists({ /** Which clusters and namespaces to watch */ lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; }) { + if (getWebsocketMultiplexerEnabled()) { + return useWatchKubeObjectListsMultiplexed({ + kubeObjectClass, + endpoint, + lists, + queryParams, + }); + } else { + return useWatchKubeObjectListsLegacy({ + kubeObjectClass, + endpoint, + lists, + queryParams, + }); + } +} + +/** + * Watches Kubernetes resource lists using multiplexed WebSocket connections. + * Efficiently manages subscriptions and updates to prevent unnecessary re-renders + * and WebSocket reconnections. + * + * @template K - Type extending KubeObject for the resources being watched + * @param kubeObjectClass - Class constructor for the Kubernetes resource type + * @param endpoint - API endpoint information for the resource + * @param lists - Array of cluster, namespace, and resourceVersion combinations to watch + * @param queryParams - Optional query parameters for the WebSocket URL + */ +function useWatchKubeObjectListsMultiplexed({ + kubeObjectClass, + endpoint, + lists, + queryParams, +}: { + kubeObjectClass: (new (...args: any) => K) & typeof KubeObject; + endpoint?: KubeObjectEndpoint | null; + lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; + queryParams?: QueryParameters; +}): void { const client = useQueryClient(); + + // Track the latest resource versions to prevent duplicate updates const latestResourceVersions = useRef>({}); - // Create URLs for all lists + // Stabilize queryParams to prevent unnecessary effect triggers + // Only update when the stringified params change + const stableQueryParams = useMemo(() => queryParams, [JSON.stringify(queryParams)]); + + // Create stable connection URLs for each list + // Updates only when endpoint, lists, or stableQueryParams change const connections = useMemo(() => { - if (!endpoint) return []; + if (!endpoint) { + return []; + } return lists.map(list => { const key = `${list.cluster}:${list.namespace || ''}`; - // Only update resourceVersion if it's newer - if ( - !latestResourceVersions.current[key] || - parseInt(list.resourceVersion) > parseInt(latestResourceVersions.current[key]) - ) { - latestResourceVersions.current[key] = list.resourceVersion; + + // Update resource version if newer one is available + const currentVersion = latestResourceVersions.current[key]; + const newVersion = list.resourceVersion; + if (!currentVersion || parseInt(newVersion) > parseInt(currentVersion)) { + latestResourceVersions.current[key] = newVersion; } + // Construct WebSocket URL with current parameters return { url: makeUrl([KubeObjectEndpoint.toUrl(endpoint, list.namespace)], { - ...queryParams, + ...stableQueryParams, watch: 1, resourceVersion: latestResourceVersions.current[key], }), @@ -139,40 +189,67 @@ export function useWatchKubeObjectLists({ namespace: list.namespace, }; }); - }, [endpoint, lists, queryParams]); + }, [endpoint, lists, stableQueryParams]); + + // Create stable update handler to process WebSocket messages + // Re-create only when dependencies change + const handleUpdate = useCallback( + (update: any, cluster: string, namespace: string | undefined) => { + if (!update || typeof update !== 'object' || !endpoint) { + return; + } + const key = `${cluster}:${namespace || ''}`; + + // Update resource version from incoming message + if (update.object?.metadata?.resourceVersion) { + latestResourceVersions.current[key] = update.object.metadata.resourceVersion; + } + + // Create query key for React Query cache + const queryKey = kubeObjectListQuery( + kubeObjectClass, + endpoint, + namespace, + cluster, + stableQueryParams ?? {} + ).queryKey; + + // Update React Query cache with new data + client.setQueryData(queryKey, (oldResponse: ListResponse | undefined | null) => { + if (!oldResponse) { + return oldResponse; + } + + const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass); + + // Only update if the list actually changed + if (newList === oldResponse.list) { + return oldResponse; + } + + return { ...oldResponse, list: newList }; + }); + }, + [client, kubeObjectClass, endpoint, stableQueryParams] + ); + + // Set up WebSocket subscriptions useEffect(() => { - if (!endpoint || connections.length === 0) return; + if (!endpoint || connections.length === 0) { + return; + } const cleanups: (() => void)[] = []; + // Create subscriptions for each connection connections.forEach(({ url, cluster, namespace }) => { const parsedUrl = new URL(url, BASE_WS_URL); - const key = `${cluster}:${namespace || ''}`; - WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => { - if (!update || typeof update !== 'object') return; - - // Update latest resourceVersion - if (update.object?.metadata?.resourceVersion) { - latestResourceVersions.current[key] = update.object.metadata.resourceVersion; - } - - const queryKey = kubeObjectListQuery( - kubeObjectClass, - endpoint, - namespace, - cluster, - queryParams ?? {} - ).queryKey; - - client.setQueryData(queryKey, (oldResponse: ListResponse | undefined | null) => { - if (!oldResponse) return oldResponse; - const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass); - if (newList === oldResponse.list) return oldResponse; - return { ...oldResponse, list: newList }; - }); - }).then( + // Subscribe to WebSocket updates + WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => + handleUpdate(update, cluster, namespace) + ).then( cleanup => cleanups.push(cleanup), error => { // Track retry count in the URL's searchParams @@ -186,10 +263,74 @@ export function useWatchKubeObjectLists({ ); }); + // Cleanup subscriptions when effect re-runs or unmounts return () => { cleanups.forEach(cleanup => cleanup()); }; - }, [connections, endpoint, client, kubeObjectClass, queryParams]); + }, [connections, endpoint, handleUpdate]); +} + +/** + * Accepts a list of lists to watch. + * Upon receiving update it will modify query data for list query + * @param kubeObjectClass - KubeObject class of the watched resource list + * @param endpoint - Kube resource API endpoint information + * @param lists - Which clusters and namespaces to watch + * @param queryParams - Query parameters for the WebSocket connection URL + */ +function useWatchKubeObjectListsLegacy({ + kubeObjectClass, + endpoint, + lists, + queryParams, +}: { + /** KubeObject class of the watched resource list */ + kubeObjectClass: (new (...args: any) => K) & typeof KubeObject; + /** Query parameters for the WebSocket connection URL */ + queryParams?: QueryParameters; + /** Kube resource API endpoint information */ + endpoint?: KubeObjectEndpoint | null; + /** Which clusters and namespaces to watch */ + lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; +}) { + const client = useQueryClient(); + + const connections = useMemo(() => { + if (!endpoint) return []; + + return lists.map(({ cluster, namespace, resourceVersion }) => { + const url = makeUrl([KubeObjectEndpoint.toUrl(endpoint!, namespace)], { + ...queryParams, + watch: 1, + resourceVersion, + }); + + return { + cluster, + url, + onMessage(update: KubeListUpdateEvent) { + const key = kubeObjectListQuery( + kubeObjectClass, + endpoint, + namespace, + cluster, + queryParams ?? {} + ).queryKey; + client.setQueryData(key, (oldResponse: ListResponse | undefined | null) => { + if (!oldResponse) return oldResponse; + + const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass); + return { ...oldResponse, list: newList }; + }); + }, + }; + }); + }, [lists, kubeObjectClass, endpoint]); + + useWebSockets>({ + enabled: !!endpoint, + connections, + }); } /** diff --git a/frontend/src/lib/k8s/api/v2/webSocket.test.ts b/frontend/src/lib/k8s/api/v2/webSocket.test.ts new file mode 100644 index 0000000000..56d9c9dafe --- /dev/null +++ b/frontend/src/lib/k8s/api/v2/webSocket.test.ts @@ -0,0 +1,382 @@ +import { renderHook } from '@testing-library/react'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import WS from 'vitest-websocket-mock'; +import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../../stateless'; +import { getToken } from '../../../auth'; +import { getCluster } from '../../../cluster'; +import { BASE_WS_URL, useWebSocket, WebSocketManager } from './webSocket'; + +// Mock dependencies +vi.mock('../../../cluster', () => ({ + getCluster: vi.fn(), +})); + +vi.mock('../../../../stateless', () => ({ + getUserIdFromLocalStorage: vi.fn(), + findKubeconfigByClusterName: vi.fn(), +})); + +vi.mock('../../../auth', () => ({ + getToken: vi.fn(), +})); + +vi.mock('./makeUrl', () => ({ + makeUrl: vi.fn((paths: string[] | string, query = {}) => { + const url = Array.isArray(paths) ? paths.filter(Boolean).join('/') : paths; + const queryString = new URLSearchParams(query).toString(); + const fullUrl = queryString ? `${url}?${queryString}` : url; + return fullUrl.replace(/([^:]\/)\/+/g, '$1'); + }), +})); + +const clusterName = 'test-cluster'; +const userId = 'test-user'; +const token = 'test-token'; + +describe('WebSocket Tests', () => { + let mockServer: WS; + let onMessage: ReturnType; + let onError: ReturnType; + + beforeEach(() => { + vi.stubEnv('REACT_APP_ENABLE_WEBSOCKET_MULTIPLEXER', 'true'); + vi.clearAllMocks(); + onMessage = vi.fn(); + onError = vi.fn(); + (getCluster as ReturnType).mockReturnValue(clusterName); + (getUserIdFromLocalStorage as ReturnType).mockReturnValue(userId); + (getToken as ReturnType).mockReturnValue(token); + (findKubeconfigByClusterName as ReturnType).mockResolvedValue({}); + + mockServer = new WS(`${BASE_WS_URL}wsMultiplexer`); + }); + + afterEach(async () => { + WS.clean(); + vi.restoreAllMocks(); + vi.unstubAllEnvs(); + WebSocketManager.socketMultiplexer = null; + WebSocketManager.connecting = false; + WebSocketManager.isReconnecting = false; + WebSocketManager.listeners.clear(); + WebSocketManager.completedPaths.clear(); + WebSocketManager.activeSubscriptions.clear(); + WebSocketManager.pendingUnsubscribes.clear(); + }); + + describe('WebSocketManager', () => { + it('should establish connection and handle messages', async () => { + const path = '/api/v1/pods'; + const query = 'watch=true'; + + // Subscribe to pod updates + await WebSocketManager.subscribe(clusterName, path, query, onMessage); + await mockServer.connected; + + // Get the subscription message + const subscribeMsg = JSON.parse((await mockServer.nextMessage) as string); + expect(subscribeMsg).toEqual({ + clusterId: clusterName, + path, + query, + userId, + type: 'REQUEST', + }); + + // Send a message from server + const podData = { kind: 'Pod', metadata: { name: 'test-pod' } }; + const serverMessage = { + clusterId: clusterName, + path, + query, + data: JSON.stringify(podData), // Important: data needs to be stringified + type: 'DATA', + }; + + await mockServer.send(JSON.stringify(serverMessage)); + + // Wait for message processing + await vi.waitFor(() => { + expect(onMessage).toHaveBeenCalledWith(podData); + }); + }); + + it('should handle multiple subscriptions', async () => { + const subs = [ + { path: '/api/v1/pods', query: 'watch=true' }, + { path: '/api/v1/services', query: 'watch=true' }, + ]; + + // Subscribe to multiple resources + await Promise.all( + subs.map(sub => WebSocketManager.subscribe(clusterName, sub.path, sub.query, onMessage)) + ); + + await mockServer.connected; + + // Verify subscription messages + for (const sub of subs) { + const msg = JSON.parse((await mockServer.nextMessage) as string); + expect(msg).toEqual({ + clusterId: clusterName, + path: sub.path, + query: sub.query, + userId, + type: 'REQUEST', + }); + + // Send data for this subscription + const resourceData = { + kind: sub.path.includes('pods') ? 'Pod' : 'Service', + metadata: { name: `test-${sub.path}` }, + }; + + const serverMessage = { + clusterId: clusterName, + path: sub.path, + query: sub.query, + data: JSON.stringify(resourceData), + type: 'DATA', + }; + + await mockServer.send(JSON.stringify(serverMessage)); + } + + // Verify all messages were received + await vi.waitFor(() => { + expect(onMessage).toHaveBeenCalledTimes(2); + }); + }); + + it('should handle COMPLETE messages', async () => { + const path = '/api/v1/pods'; + const query = 'watch=true'; + + await WebSocketManager.subscribe(clusterName, path, query, onMessage); + await mockServer.connected; + + // Skip subscription message + await mockServer.nextMessage; + + // Send COMPLETE message + const completeMessage = { + clusterId: clusterName, + path, + query, + type: 'COMPLETE', + }; + + await mockServer.send(JSON.stringify(completeMessage)); + + // Verify the path is marked as completed + const key = WebSocketManager.createKey(clusterName, path, query); + expect(WebSocketManager.completedPaths.has(key)).toBe(true); + }); + + it('should handle unsubscribe', async () => { + const path = '/api/v1/pods'; + const query = 'watch=true'; + + const cleanup = await WebSocketManager.subscribe(clusterName, path, query, onMessage); + await mockServer.connected; + + // Skip subscription message + await mockServer.nextMessage; + + // Unsubscribe + cleanup(); + + // Wait for unsubscribe message (after debounce) + await vi.waitFor(async () => { + const msg = JSON.parse((await mockServer.nextMessage) as string); + expect(msg).toEqual({ + clusterId: clusterName, + path, + query, + userId, + type: 'CLOSE', + }); + }); + + // Verify subscription is removed + const key = WebSocketManager.createKey(clusterName, path, query); + expect(WebSocketManager.activeSubscriptions.has(key)).toBe(false); + }); + + it('should handle connection errors', async () => { + // Close the server to simulate connection failure + await mockServer.close(); + + // Attempt to subscribe should fail + await expect( + WebSocketManager.subscribe(clusterName, '/api/v1/pods', 'watch=true', onMessage) + ).rejects.toThrow('WebSocket connection failed'); + }); + + it('should handle duplicate subscriptions', async () => { + const path = '/api/v1/pods'; + const query = 'watch=true'; + + // Create two subscriptions with the same parameters + const onMessage2 = vi.fn(); + await WebSocketManager.subscribe(clusterName, path, query, onMessage); + await WebSocketManager.subscribe(clusterName, path, query, onMessage2); + + await mockServer.connected; + + // Should only receive one subscription message + const subMsg = JSON.parse((await mockServer.nextMessage) as string); + expect(subMsg.type).toBe('REQUEST'); + + // Send a message + const podData = { kind: 'Pod', metadata: { name: 'test-pod' } }; + await mockServer.send( + JSON.stringify({ + clusterId: clusterName, + path, + query, + data: JSON.stringify(podData), + type: 'DATA', + }) + ); + + // Both handlers should receive the message + await vi.waitFor(() => { + expect(onMessage).toHaveBeenCalledWith(podData); + expect(onMessage2).toHaveBeenCalledWith(podData); + }); + }); + + it('should debounce unsubscribe operations', async () => { + const path = '/api/v1/pods'; + const query = 'watch=true'; + + const cleanup = await WebSocketManager.subscribe(clusterName, path, query, onMessage); + await mockServer.connected; + + // Skip subscription message + await mockServer.nextMessage; + + // Unsubscribe + cleanup(); + + // Subscribe again immediately + await WebSocketManager.subscribe(clusterName, path, query, onMessage); + + // Wait for potential unsubscribe message + await vi.waitFor(() => { + const key = WebSocketManager.createKey(clusterName, path, query); + expect(WebSocketManager.activeSubscriptions.has(key)).toBe(true); + }); + + // Verify no CLOSE message was sent + try { + const msg = JSON.parse((await mockServer.nextMessage) as string); + expect(msg.type).not.toBe('CLOSE'); + } catch (e) { + // No message is also acceptable + } + }); + }); + + describe('useWebSocket hook', () => { + it('should not connect when disabled', () => { + renderHook(() => + useWebSocket({ + url: () => '/api/v1/pods', + enabled: false, + cluster: clusterName, + onMessage, + onError, + }) + ); + + expect(WebSocketManager.socketMultiplexer).toBeNull(); + }); + + it('should handle successful connection and messages', async () => { + const fullUrl = `${BASE_WS_URL}api/v1/pods`; + + renderHook(() => + useWebSocket({ + url: () => fullUrl, + enabled: true, + cluster: clusterName, + onMessage, + onError, + }) + ); + + await mockServer.connected; + + // Skip subscription message + await mockServer.nextMessage; + + // Send test message + const podData = { kind: 'Pod', metadata: { name: 'test-pod' } }; + await mockServer.send( + JSON.stringify({ + clusterId: clusterName, + path: '/api/v1/pods', + data: JSON.stringify(podData), + type: 'DATA', + }) + ); + + await vi.waitFor(() => { + expect(onMessage).toHaveBeenCalledWith(podData); + }); + }, 10000); + + it('should handle connection errors', async () => { + const fullUrl = `${BASE_WS_URL}api/v1/pods`; + + // Close the server to simulate connection failure + await mockServer.close(); + + renderHook(() => + useWebSocket({ + url: () => fullUrl, + enabled: true, + cluster: clusterName, + onMessage, + onError, + }) + ); + + await vi.waitFor(() => { + expect(onError).toHaveBeenCalled(); + }); + }); + + it('should cleanup on unmount', async () => { + const fullUrl = `${BASE_WS_URL}api/v1/pods`; + + const { unmount } = renderHook(() => + useWebSocket({ + url: () => fullUrl, + enabled: true, + cluster: clusterName, + onMessage, + onError, + }) + ); + + await mockServer.connected; + + // Skip subscription message + await mockServer.nextMessage; + + // Unmount and wait for cleanup + unmount(); + + await vi.waitFor( + async () => { + const msg = JSON.parse((await mockServer.nextMessage) as string); + expect(msg.type).toBe('CLOSE'); + }, + { timeout: 10000 } + ); + }); + }); +}); diff --git a/frontend/src/lib/k8s/api/v2/webSocket.ts b/frontend/src/lib/k8s/api/v2/webSocket.ts index 975e6751a7..e8c12162a6 100644 --- a/frontend/src/lib/k8s/api/v2/webSocket.ts +++ b/frontend/src/lib/k8s/api/v2/webSocket.ts @@ -1,64 +1,91 @@ -import { useEffect, useMemo } from 'react'; -import { getUserIdFromLocalStorage } from '../../../../stateless'; -import { KubeObjectInterface } from '../../KubeObject'; +import { useCallback, useEffect, useMemo } from 'react'; +import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../../stateless'; +import { getToken } from '../../../auth'; +import { getCluster } from '../../../cluster'; import { BASE_HTTP_URL } from './fetch'; -import { KubeListUpdateEvent } from './KubeList'; +import { makeUrl } from './makeUrl'; // Constants for WebSocket connection export const BASE_WS_URL = BASE_HTTP_URL.replace('http', 'ws'); + /** * Multiplexer endpoint for WebSocket connections + * This endpoint allows multiple subscriptions over a single connection */ const MULTIPLEXER_ENDPOINT = 'wsMultiplexer'; -// Message types for WebSocket communication +/** + * Message format for WebSocket communication between client and server. + * Used to manage subscriptions to Kubernetes resource updates. + */ interface WebSocketMessage { - /** Cluster ID */ + /** + * Cluster identifier used to route messages to the correct Kubernetes cluster. + * This is particularly important in multi-cluster environments. + */ clusterId: string; - /** API path */ + + /** + * API resource path that identifies the Kubernetes resource being watched. + * Example: '/api/v1/pods' or '/apis/apps/v1/deployments' + */ path: string; - /** Query parameters */ + + /** + * Query parameters for filtering or modifying the watch request. + * Example: 'labelSelector=app%3Dnginx&fieldSelector=status.phase%3DRunning' + */ query: string; - /** User ID */ + + /** + * User identifier for authentication and authorization. + * Used to ensure users only receive updates for resources they have access to. + */ userId: string; - /** Message type */ + + /** + * Message type that indicates the purpose of the message: + * - REQUEST: Client is requesting to start watching a resource + * - CLOSE: Client wants to stop watching a resource + * - COMPLETE: Server indicates the watch request has completed (e.g., due to timeout or error) + */ type: 'REQUEST' | 'CLOSE' | 'COMPLETE'; } /** * WebSocket manager to handle connections across the application. * Provides a singleton-like interface for managing WebSocket connections, - * subscriptions, and message handling. + * subscriptions, and message handling. Implements connection multiplexing + * to optimize network usage. */ export const WebSocketManager = { /** Current WebSocket connection instance */ - socket: null as WebSocket | null, + socketMultiplexer: null as WebSocket | null, /** Flag to track if a connection attempt is in progress */ connecting: false, - /** Map of message handlers for each subscription path - * Key format: clusterId:path:query - * Value: Set of callback functions for that subscription - */ + /** Flag to track if we're reconnecting after a disconnect */ + isReconnecting: false, + + /** Map of message handlers for each subscription path */ listeners: new Map void>>(), - /** Set of paths that have received a COMPLETE message - * Used to prevent processing further messages for completed paths - */ + /** Set of paths that have received a COMPLETE message */ completedPaths: new Set(), - /** Set of active WebSocket subscriptions to prevent duplicates - * Keys are in format: clusterId:path:query - */ - activeSubscriptions: new Set(), + /** Map of active WebSocket subscriptions with their details */ + activeSubscriptions: new Map(), + + /** Map to track pending unsubscribe operations for debouncing */ + pendingUnsubscribes: new Map(), /** * Creates a unique key for identifying WebSocket subscriptions - * @param clusterId - The ID of the Kubernetes cluster - * @param path - The API path being watched - * @param query - Query parameters for the subscription - * @returns A unique string key in format clusterId:path:query + * @param clusterId - Cluster identifier + * @param path - API resource path + * @param query - Query parameters + * @returns Unique subscription key */ createKey(clusterId: string, path: string, query: string): string { return `${clusterId}:${path}:${query}`; @@ -79,22 +106,21 @@ export const WebSocketManager = { * but that adds complexity and potential race conditions to handle. * The current polling approach, while not perfect, is simple and mostly reliable. * - * @returns Promise resolving to WebSocket instance - * @throws Error if connection fails + * @returns Promise resolving to WebSocket connection */ async connect(): Promise { // Return existing connection if available - if (this.socket?.readyState === WebSocket.OPEN) { - return this.socket; + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { + return this.socketMultiplexer; } - // Wait for existing connection attempt to complete + // Wait for existing connection attempt if in progress if (this.connecting) { return new Promise(resolve => { const checkConnection = setInterval(() => { - if (this.socket?.readyState === WebSocket.OPEN) { + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { clearInterval(checkConnection); - resolve(this.socket); + resolve(this.socketMultiplexer); } }, 100); }); @@ -107,14 +133,19 @@ export const WebSocketManager = { const socket = new WebSocket(wsUrl); socket.onopen = () => { - this.socket = socket; + this.socketMultiplexer = socket; this.connecting = false; + + // Only resubscribe if we're reconnecting after a disconnect + if (this.isReconnecting) { + this.resubscribeAll(socket); + } + this.isReconnecting = false; + resolve(socket); }; - socket.onmessage = (event: MessageEvent) => { - this.handleWebSocketMessage(event); - }; + socket.onmessage = this.handleWebSocketMessage.bind(this); socket.onerror = event => { console.error('WebSocket error:', event); @@ -129,84 +160,29 @@ export const WebSocketManager = { }, /** - * Handles incoming WebSocket messages - * Parses messages and distributes them to appropriate listeners - * @param event - Raw WebSocket message event + * Resubscribes all active subscriptions to a new socket + * @param socket - WebSocket connection to subscribe to */ - handleWebSocketMessage(event: MessageEvent): void { - try { - const data = JSON.parse(event.data); - if (!data.clusterId || !data.path) return; - - const key = this.createKey(data.clusterId, data.path, data.query || ''); - - if (data.type === 'COMPLETE') { - this.handleCompletionMessage(data, key); - return; - } - - if (this.completedPaths.has(key)) { - return; - } - - // Parse the update data - let update; - try { - update = data.data ? JSON.parse(data.data) : data; - } catch (err) { - console.error('Failed to parse update data:', err); - return; - } - - // Only notify listeners if we have a valid update - if (update && typeof update === 'object') { - this.listeners.get(key)?.forEach(listener => listener(update)); - } - } catch (err) { - console.error('Failed to process WebSocket message:', err); - } - }, - - /** - * Handles COMPLETE type messages from the server - * Marks paths as completed and sends close message - * @param data - The complete message data - * @param key - The subscription key - */ - handleCompletionMessage(data: any, key: string): void { - this.completedPaths.add(key); - if (this.socket?.readyState === WebSocket.OPEN) { - const closeMsg: WebSocketMessage = { - clusterId: data.clusterId, - path: data.path, - query: data.query || '', - userId: data.userId || '', - type: 'CLOSE', + resubscribeAll(socket: WebSocket): void { + this.activeSubscriptions.forEach(({ clusterId, path, query }) => { + const userId = getUserIdFromLocalStorage(); + const requestMsg: WebSocketMessage = { + clusterId, + path, + query, + userId: userId || '', + type: 'REQUEST', }; - this.socket.send(JSON.stringify(closeMsg)); - } - }, - - /** - * Handles WebSocket connection close events - * Implements reconnection logic with delay - */ - handleWebSocketClose(): void { - console.log('WebSocket closed, attempting reconnect...'); - this.socket = null; - this.connecting = false; - if (this.listeners.size > 0) { - setTimeout(() => this.connect(), 1000); - } + socket.send(JSON.stringify(requestMsg)); + }); }, /** - * Subscribes to WebSocket updates for a specific path - * Manages subscription lifecycle and prevents duplicates - * @param clusterId - The ID of the Kubernetes cluster to watch - * @param path - The API path to watch - * @param query - Query parameters including resourceVersion - * @param onMessage - Callback function to handle incoming messages + * Subscribe to WebSocket updates for a specific resource + * @param clusterId - Cluster identifier + * @param path - API resource path + * @param query - Query parameters + * @param onMessage - Callback for handling incoming messages * @returns Promise resolving to cleanup function */ async subscribe( @@ -217,103 +193,199 @@ export const WebSocketManager = { ): Promise<() => void> { const key = this.createKey(clusterId, path, query); - // Don't create duplicate subscriptions for the same path - if (this.activeSubscriptions.has(key)) { - if (!this.listeners.has(key)) { - this.listeners.set(key, new Set()); - } - this.listeners.get(key)!.add(onMessage); - return () => this.handleUnsubscribe(key, onMessage, null, path, query); - } + // Add to active subscriptions + this.activeSubscriptions.set(key, { clusterId, path, query }); - this.activeSubscriptions.add(key); - if (!this.listeners.has(key)) { - this.listeners.set(key, new Set()); - } - this.listeners.get(key)!.add(onMessage); + // Add message listener + const listeners = this.listeners.get(key) || new Set(); + listeners.add(onMessage); + this.listeners.set(key, listeners); + // Establish connection and send REQUEST const socket = await this.connect(); const userId = getUserIdFromLocalStorage(); - - const message: WebSocketMessage = { + const requestMsg: WebSocketMessage = { clusterId, path, query, userId: userId || '', type: 'REQUEST', }; + socket.send(JSON.stringify(requestMsg)); - socket.send(JSON.stringify(message)); - - return () => { - this.activeSubscriptions.delete(key); - this.handleUnsubscribe(key, onMessage, userId, path, query); - }; + // Return cleanup function + return () => this.unsubscribe(key, clusterId, path, query, onMessage); }, /** - * Handles cleanup when unsubscribing from a WebSocket path - * Removes listeners and closes connection if no more subscriptions - * @param key - The unique subscription key - * @param onMessage - The message handler to remove - * @param userId - The user ID associated with the subscription - * @param path - The API path being watched - * @param query - Query parameters for the subscription + * Unsubscribes from WebSocket updates with debouncing to prevent rapid subscribe/unsubscribe cycles. + * + * State Management: + * - Manages pendingUnsubscribes: Map of timeouts for delayed unsubscription + * - Manages listeners: Map of message handlers for each subscription + * - Manages activeSubscriptions: Set of currently active WebSocket subscriptions + * - Manages completedPaths: Set of paths that have completed their initial data fetch + * + * Debouncing Logic: + * 1. Clears any pending unsubscribe timeout for the subscription + * 2. Removes the message handler from listeners + * 3. If no listeners remain, sets a timeout before actually unsubscribing + * 4. Only sends CLOSE message if no new listeners are added during timeout + * + * @param key - Subscription key that uniquely identifies this subscription + * @param clusterId - Cluster identifier for routing to correct cluster + * @param path - API resource path being watched + * @param query - Query parameters for filtering + * @param onMessage - Message handler to remove from subscription */ - handleUnsubscribe( + unsubscribe( key: string, - onMessage: (data: any) => void, - userId: string | null, + clusterId: string, path: string, - query: string + query: string, + onMessage: (data: any) => void ): void { + // Clear any pending unsubscribe for this key + const pendingTimeout = this.pendingUnsubscribes.get(key); + if (pendingTimeout) { + clearTimeout(pendingTimeout); + this.pendingUnsubscribes.delete(key); + } + + // Remove the listener const listeners = this.listeners.get(key); - listeners?.delete(onMessage); - - if (listeners?.size === 0) { - this.listeners.delete(key); - this.completedPaths.delete(key); - this.activeSubscriptions.delete(key); - - if (this.socket?.readyState === WebSocket.OPEN) { - const [clusterId] = key.split(':'); - const closeMsg: WebSocketMessage = { - clusterId, - path, - query, - userId: userId || '', - type: 'CLOSE', - }; - this.socket.send(JSON.stringify(closeMsg)); + if (listeners) { + listeners.delete(onMessage); + if (listeners.size === 0) { + this.listeners.delete(key); + + // Delay unsubscription to handle rapid re-subscriptions + // This prevents unnecessary WebSocket churn when a component quickly unmounts and remounts + // For example: during route changes or component updates in React's strict mode + const timeout = setTimeout(() => { + // Only unsubscribe if there are still no listeners + if (!this.listeners.has(key)) { + this.activeSubscriptions.delete(key); + this.completedPaths.delete(key); + + if (this.socketMultiplexer?.readyState === WebSocket.OPEN) { + const userId = getUserIdFromLocalStorage(); + const closeMsg: WebSocketMessage = { + clusterId, + path, + query, + userId: userId || '', + type: 'CLOSE', + }; + this.socketMultiplexer.send(JSON.stringify(closeMsg)); + } + } + this.pendingUnsubscribes.delete(key); + }, 100); // 100ms debounce + + this.pendingUnsubscribes.set(key, timeout); } } + }, - if (this.listeners.size === 0) { - this.socket?.close(); - this.socket = null; + /** + * Handles WebSocket connection close event + * Sets up state for potential reconnection + */ + handleWebSocketClose(): void { + this.socketMultiplexer = null; + this.connecting = false; + this.completedPaths.clear(); + + // Set reconnecting flag if we have active subscriptions + this.isReconnecting = this.activeSubscriptions.size > 0; + }, + + /** + * Handles incoming WebSocket messages + * Processes different message types and notifies appropriate listeners + * @param event - WebSocket message event + */ + handleWebSocketMessage(event: MessageEvent): void { + try { + const data = JSON.parse(event.data); + if (!data.clusterId || !data.path) { + return; + } + + const key = this.createKey(data.clusterId, data.path, data.query || ''); + + // Handle COMPLETE messages + if (data.type === 'COMPLETE') { + this.completedPaths.add(key); + return; + } + + // Skip if path is already completed + if (this.completedPaths.has(key)) { + return; + } + + // Parse and validate update data + let update; + try { + update = data.data ? JSON.parse(data.data) : data; + } catch (err) { + console.error('Failed to parse update data:', err); + return; + } + + // Notify listeners if update is valid + if (update && typeof update === 'object') { + const listeners = this.listeners.get(key); + if (listeners) { + listeners.forEach(listener => listener(update)); + } + } + } catch (err) { + console.error('Failed to process WebSocket message:', err); } }, }; +/** + * Configuration for establishing a WebSocket connection to watch Kubernetes resources. + * Used by the multiplexer to manage multiple WebSocket connections efficiently. + * + * @template T The expected type of data that will be received over the WebSocket + */ +export type WebSocketConnectionRequest = { + /** + * The Kubernetes cluster identifier to connect to. + * Used for routing WebSocket messages in multi-cluster environments. + */ + cluster: string; + + /** + * The WebSocket endpoint URL to connect to. + * Should be a full URL including protocol and any query parameters. + * Example: 'https://cluster.example.com/api/v1/pods/watch' + */ + url: string; + + /** + * Callback function that handles incoming messages from the WebSocket. + * @param data The message payload, typed as T (e.g., K8s Pod, Service, etc.) + */ + onMessage: (data: T) => void; +}; + /** * React hook for WebSocket subscription to Kubernetes resources + * @template T - Type of data expected from the WebSocket * @param options - Configuration options for the WebSocket connection * @param options.url - Function that returns the WebSocket URL to connect to * @param options.enabled - Whether the WebSocket connection should be active * @param options.cluster - The Kubernetes cluster ID to watch - * @param options.onMessage - Callback function to handle incoming Kubernetes events + * @param options.onMessage - Callback function to handle incoming messages * @param options.onError - Callback function to handle connection errors - * - * @example - * useWebSocket({ - * url: () => '/api/v1/pods?watch=1', - * enabled: true, - * cluster: 'my-cluster', - * onMessage: (event) => console.log('Pod update:', event), - * onError: (error) => console.error('WebSocket error:', error), - * }); */ -export function useWebSocket({ +export function useWebSocket({ url: createUrl, enabled = true, cluster = '', @@ -326,67 +398,230 @@ export function useWebSocket({ enabled?: boolean; /** The Kubernetes cluster ID to watch */ cluster?: string; - /** Callback function to handle incoming Kubernetes events */ - onMessage: (data: KubeListUpdateEvent) => void; + /** Callback function to handle incoming messages */ + onMessage: (data: T) => void; /** Callback function to handle connection errors */ onError?: (error: Error) => void; }) { - /** - * Memoized URL to prevent unnecessary reconnections - */ const url = useMemo(() => (enabled ? createUrl() : ''), [enabled, createUrl]); + const stableOnMessage = useCallback( + (rawData: any) => { + try { + let parsedData: T; + try { + parsedData = typeof rawData === 'string' ? JSON.parse(rawData) : rawData; + } catch (parseError) { + console.error('Failed to parse WebSocket message:', parseError); + onError?.(parseError as Error); + return; + } + + onMessage(parsedData); + } catch (err) { + console.error('Failed to process WebSocket message:', err); + onError?.(err as Error); + } + }, + [onMessage, onError] + ); + useEffect(() => { - if (!enabled || !url) return; + if (!enabled || !url) { + return; + } - const parsedUrl = new URL(url, BASE_WS_URL); let cleanup: (() => void) | undefined; - WebSocketManager.subscribe( - cluster, - parsedUrl.pathname, - parsedUrl.search.slice(1), - (update: any) => { - try { - if (isKubeListUpdateEvent(update)) { - onMessage(update); - } - } catch (err) { - console.error('Failed to process WebSocket message:', err); - onError?.(err as Error); - } - } - ).then( - unsubscribe => { - cleanup = unsubscribe; - }, - error => { - console.error('WebSocket subscription failed:', error); - onError?.(error); + const connectWebSocket = async () => { + try { + const parsedUrl = new URL(url); + cleanup = await WebSocketManager.subscribe( + cluster, + parsedUrl.pathname, + parsedUrl.search.slice(1), + stableOnMessage + ); + } catch (err) { + console.error('WebSocket connection failed:', err); + onError?.(err as Error); } - ); + }; + + connectWebSocket(); - // Cleanup function to unsubscribe when the component unmounts - // or when any of the dependencies change return () => { - cleanup?.(); + if (cleanup) { + cleanup(); + } }; - }, [enabled, url, cluster, onMessage, onError]); + }, [url, enabled, cluster, stableOnMessage, onError]); } /** - * Type guard to check if a message is a valid Kubernetes list update event - * @param data - The data to check - * @returns True if the data is a valid KubeListUpdateEvent + * Keeps track of open WebSocket connections and active listeners */ -function isKubeListUpdateEvent( - data: any -): data is KubeListUpdateEvent { - return ( - data && - typeof data === 'object' && - 'type' in data && - 'object' in data && - ['ADDED', 'MODIFIED', 'DELETED'].includes(data.type) - ); +const sockets = new Map(); +const listeners = new Map void>>(); + +/** + * Create new WebSocket connection to the backend + * + * @param url - WebSocket URL + * @param options - Connection options + * + * @returns WebSocket connection + */ +export async function openWebSocket( + url: string, + { + protocols: moreProtocols = [], + type = 'binary', + cluster = getCluster() ?? '', + onMessage, + }: { + /** + * Any additional protocols to include in WebSocket connection + */ + protocols?: string | string[]; + /** + * + */ + type: 'json' | 'binary'; + /** + * Cluster name + */ + cluster?: string; + /** + * Message callback + */ + onMessage: (data: T) => void; + } +) { + const path = [url]; + const protocols = ['base64.binary.k8s.io', ...(moreProtocols ?? [])]; + + const token = getToken(cluster); + if (token) { + const encodedToken = btoa(token).replace(/=/g, ''); + protocols.push(`base64url.bearer.authorization.k8s.io.${encodedToken}`); + } + + if (cluster) { + path.unshift('clusters', cluster); + + try { + const kubeconfig = await findKubeconfigByClusterName(cluster); + + if (kubeconfig !== null) { + const userID = getUserIdFromLocalStorage(); + protocols.push(`base64url.headlamp.authorization.k8s.io.${userID}`); + } + } catch (error) { + console.error('Error while finding kubeconfig:', error); + } + } + + const socket = new WebSocket(makeUrl([BASE_WS_URL, ...path], {}), protocols); + socket.binaryType = 'arraybuffer'; + socket.addEventListener('message', (body: MessageEvent) => { + const data = type === 'json' ? JSON.parse(body.data) : body.data; + onMessage(data); + }); + socket.addEventListener('error', error => { + console.error('WebSocket error:', error); + }); + + return socket; +} + +/** + * Creates or joins mutiple existing WebSocket connections + * + * @param url - endpoint URL + * @param options - WebSocket options + */ +export function useWebSockets({ + connections, + enabled = true, + protocols, + type = 'json', +}: { + enabled?: boolean; + /** Make sure that connections value is stable between renders */ + connections: Array>; + /** + * Any additional protocols to include in WebSocket connection + * make sure that the value is stable between renders + */ + protocols?: string | string[]; + /** + * Type of websocket data + */ + type?: 'json' | 'binary'; +}) { + useEffect(() => { + if (!enabled) return; + + let isCurrent = true; + + /** Open a connection to websocket */ + function connect({ cluster, url, onMessage }: WebSocketConnectionRequest) { + const connectionKey = cluster + url; + + if (!sockets.has(connectionKey)) { + // Add new listener for this URL + listeners.set(connectionKey, [...(listeners.get(connectionKey) ?? []), onMessage]); + + // Mark socket as pending, so we don't open more than one + sockets.set(connectionKey, 'pending'); + + let ws: WebSocket | undefined; + openWebSocket(url, { protocols, type, cluster, onMessage }) + .then(socket => { + ws = socket; + + // Hook was unmounted while it was connecting to WebSocket + // so we close the socket and clean up + if (!isCurrent) { + ws.close(); + sockets.delete(connectionKey); + return; + } + + sockets.set(connectionKey, ws); + }) + .catch(err => { + console.error(err); + }); + } + + return () => { + const connectionKey = cluster + url; + + // Clean up the listener + const newListeners = listeners.get(connectionKey)?.filter(it => it !== onMessage) ?? []; + listeners.set(connectionKey, newListeners); + + // No one is listening to the connection + // so we can close it + if (newListeners.length === 0) { + const maybeExisting = sockets.get(connectionKey); + if (maybeExisting) { + if (maybeExisting !== 'pending') { + maybeExisting.close(); + } + sockets.delete(connectionKey); + } + } + }; + } + + const disconnectCallbacks = connections.map(endpoint => connect(endpoint)); + + return () => { + isCurrent = false; + disconnectCallbacks.forEach(fn => fn()); + }; + }, [enabled, type, connections, protocols]); } diff --git a/frontend/src/plugin/__snapshots__/pluginLib.snapshot b/frontend/src/plugin/__snapshots__/pluginLib.snapshot index dafdc1d599..d5206ac49c 100644 --- a/frontend/src/plugin/__snapshots__/pluginLib.snapshot +++ b/frontend/src/plugin/__snapshots__/pluginLib.snapshot @@ -16114,4 +16114,4 @@ "registerSidebarEntry": [Function], "registerSidebarEntryFilter": [Function], "runCommand": [Function], -} \ No newline at end of file +}