From 7266baae9e209843ca2f010ec3e5c60ae1f54f37 Mon Sep 17 00:00:00 2001 From: Todd Anderson <127344469+tanderson-ld@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:27:37 -0600 Subject: [PATCH] chore: moving internal StreamingProcessor class from js-sdk-common to js-server-sdk-common (#700) **Requirements** - [ ] I have added test coverage for new or changed functionality No behavioral changes, existing tests migrated with class. - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [ ] I have validated my changes against all supported platform versions **Related issues** SDK-156, prepping for SDK-849 --- .../shared/common/src/datasource/errors.ts | 2 ++ .../shared/common/src/datasource/index.ts | 15 ++++++++-- packages/shared/common/src/index.ts | 2 ++ packages/shared/common/src/internal/index.ts | 1 - .../common/src/internal/stream/index.ts | 5 ---- .../common/src/internal/stream/types.ts | 3 -- .../src/streaming/StreamingProcessor.ts | 5 ++-- .../data_sources}/StreamingProcessor.test.ts | 28 +++++++++++-------- .../__tests__/streamingProcessor.ts | 3 +- .../shared/sdk-server/src/LDClientImpl.ts | 3 +- packages/shared/sdk-server/src/Migration.ts | 3 +- .../src/data_sources}/StreamingProcessor.ts | 27 +++++++++--------- 12 files changed, 55 insertions(+), 42 deletions(-) delete mode 100644 packages/shared/common/src/internal/stream/index.ts delete mode 100644 packages/shared/common/src/internal/stream/types.ts rename packages/shared/{common/__tests__/internal/stream => sdk-server/__tests__/data_sources}/StreamingProcessor.test.ts (91%) rename packages/shared/{common/src/internal/stream => sdk-server/src/data_sources}/StreamingProcessor.ts (86%) diff --git a/packages/shared/common/src/datasource/errors.ts b/packages/shared/common/src/datasource/errors.ts index ef804f7f1..f2d1f7366 100644 --- a/packages/shared/common/src/datasource/errors.ts +++ b/packages/shared/common/src/datasource/errors.ts @@ -35,3 +35,5 @@ export class LDStreamingError extends Error { this.recoverable = recoverable; } } + +export type StreamingErrorHandler = (err: LDStreamingError) => void; diff --git a/packages/shared/common/src/datasource/index.ts b/packages/shared/common/src/datasource/index.ts index f888015fb..fe4b250c5 100644 --- a/packages/shared/common/src/datasource/index.ts +++ b/packages/shared/common/src/datasource/index.ts @@ -1,4 +1,15 @@ import { DataSourceErrorKind } from './DataSourceErrorKinds'; -import { LDFileDataSourceError, LDPollingError, LDStreamingError } from './errors'; +import { + LDFileDataSourceError, + LDPollingError, + LDStreamingError, + StreamingErrorHandler, +} from './errors'; -export { DataSourceErrorKind, LDFileDataSourceError, LDPollingError, LDStreamingError }; +export { + DataSourceErrorKind, + LDFileDataSourceError, + LDPollingError, + LDStreamingError, + StreamingErrorHandler, +}; diff --git a/packages/shared/common/src/index.ts b/packages/shared/common/src/index.ts index 2d23590f0..ae3f9daf0 100644 --- a/packages/shared/common/src/index.ts +++ b/packages/shared/common/src/index.ts @@ -6,6 +6,7 @@ import { LDFileDataSourceError, LDPollingError, LDStreamingError, + StreamingErrorHandler, } from './datasource'; export * from './api'; @@ -24,5 +25,6 @@ export { DataSourceErrorKind, LDPollingError, LDStreamingError, + StreamingErrorHandler, LDFileDataSourceError, }; diff --git a/packages/shared/common/src/internal/index.ts b/packages/shared/common/src/internal/index.ts index eb2951786..27abb7993 100644 --- a/packages/shared/common/src/internal/index.ts +++ b/packages/shared/common/src/internal/index.ts @@ -2,4 +2,3 @@ export * from './context'; export * from './diagnostics'; export * from './evaluation'; export * from './events'; -export * from './stream'; diff --git a/packages/shared/common/src/internal/stream/index.ts b/packages/shared/common/src/internal/stream/index.ts deleted file mode 100644 index 82a6edfe5..000000000 --- a/packages/shared/common/src/internal/stream/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -import StreamingProcessor from './StreamingProcessor'; -import type { StreamingErrorHandler } from './types'; - -export { StreamingProcessor }; -export type { StreamingErrorHandler }; diff --git a/packages/shared/common/src/internal/stream/types.ts b/packages/shared/common/src/internal/stream/types.ts deleted file mode 100644 index 4b84650e6..000000000 --- a/packages/shared/common/src/internal/stream/types.ts +++ /dev/null @@ -1,3 +0,0 @@ -import { LDStreamingError } from '../../datasource/errors'; - -export type StreamingErrorHandler = (err: LDStreamingError) => void; diff --git a/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts b/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts index f6aad31af..48f49858c 100644 --- a/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts +++ b/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts @@ -13,6 +13,7 @@ import { ProcessStreamResponse, Requests, shouldRetry, + StreamingErrorHandler, subsystem, } from '@launchdarkly/js-sdk-common'; @@ -23,7 +24,7 @@ const reportJsonError = ( type: string, data: string, logger?: LDLogger, - errorHandler?: internal.StreamingErrorHandler, + errorHandler?: StreamingErrorHandler, ) => { logger?.error(`Stream received invalid data in "${type}" message`); logger?.debug(`Invalid JSON follows: ${data}`); @@ -47,7 +48,7 @@ class StreamingProcessor implements subsystem.LDStreamProcessor { encoding: Encoding, private readonly _pollingRequestor: Requestor, private readonly _diagnosticsManager?: internal.DiagnosticsManager, - private readonly _errorHandler?: internal.StreamingErrorHandler, + private readonly _errorHandler?: StreamingErrorHandler, private readonly _logger?: LDLogger, ) { let path: string; diff --git a/packages/shared/common/__tests__/internal/stream/StreamingProcessor.test.ts b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts similarity index 91% rename from packages/shared/common/__tests__/internal/stream/StreamingProcessor.test.ts rename to packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts index 293e35124..f2b7b21aa 100644 --- a/packages/shared/common/__tests__/internal/stream/StreamingProcessor.test.ts +++ b/packages/shared/sdk-server/__tests__/data_sources/StreamingProcessor.test.ts @@ -1,11 +1,17 @@ -import { EventName, Info, LDLogger, ProcessStreamResponse } from '../../../src/api'; -import { LDStreamProcessor } from '../../../src/api/subsystem'; -import { DataSourceErrorKind } from '../../../src/datasource/DataSourceErrorKinds'; -import { LDStreamingError } from '../../../src/datasource/errors'; -import { DiagnosticsManager } from '../../../src/internal/diagnostics'; -import StreamingProcessor from '../../../src/internal/stream/StreamingProcessor'; -import { defaultHeaders } from '../../../src/utils'; -import { createBasicPlatform } from '../../createBasicPlatform'; +import { + DataSourceErrorKind, + defaultHeaders, + EventName, + Info, + internal, + LDLogger, + LDStreamingError, + ProcessStreamResponse, + subsystem, +} from '@launchdarkly/js-sdk-common'; + +import StreamingProcessor from '../../src/data_sources/StreamingProcessor'; +import { createBasicPlatform } from '../createBasicPlatform'; let logger: LDLogger; @@ -61,8 +67,8 @@ const createMockEventSource = (streamUri: string = '', options: any = {}) => ({ describe('given a stream processor with mock event source', () => { let info: Info; - let streamingProcessor: LDStreamProcessor; - let diagnosticsManager: DiagnosticsManager; + let streamingProcessor: subsystem.LDStreamProcessor; + let diagnosticsManager: internal.DiagnosticsManager; let listeners: Map; let mockEventSource: any; let mockListener: ProcessStreamResponse; @@ -104,7 +110,7 @@ describe('given a stream processor with mock event source', () => { listeners.set('put', mockListener); listeners.set('patch', mockListener); - diagnosticsManager = new DiagnosticsManager(sdkKey, basicPlatform, {}); + diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {}); streamingProcessor = new StreamingProcessor( { basicConfiguration: getBasicConfiguration(logger), diff --git a/packages/shared/sdk-server/__tests__/streamingProcessor.ts b/packages/shared/sdk-server/__tests__/streamingProcessor.ts index 2f6e2a802..15808a3f2 100644 --- a/packages/shared/sdk-server/__tests__/streamingProcessor.ts +++ b/packages/shared/sdk-server/__tests__/streamingProcessor.ts @@ -5,6 +5,7 @@ import type { LDHeaders, LDStreamingError, ProcessStreamResponse, + StreamingErrorHandler, } from '@launchdarkly/js-sdk-common'; export const MockStreamingProcessor = jest.fn(); @@ -29,7 +30,7 @@ export const setupMockStreamingProcessor = ( listeners: Map, baseHeaders: LDHeaders, diagnosticsManager: internal.DiagnosticsManager, - errorHandler: internal.StreamingErrorHandler, + errorHandler: StreamingErrorHandler, _streamInitialReconnectDelay: number, ) => ({ start: jest.fn(async () => { diff --git a/packages/shared/sdk-server/src/LDClientImpl.ts b/packages/shared/sdk-server/src/LDClientImpl.ts index 08a25dba2..44a355158 100644 --- a/packages/shared/sdk-server/src/LDClientImpl.ts +++ b/packages/shared/sdk-server/src/LDClientImpl.ts @@ -36,6 +36,7 @@ import { createStreamListeners } from './data_sources/createStreamListeners'; import DataSourceUpdates from './data_sources/DataSourceUpdates'; import PollingProcessor from './data_sources/PollingProcessor'; import Requestor from './data_sources/Requestor'; +import StreamingProcessor from './data_sources/StreamingProcessor'; import createDiagnosticsInitConfig from './diagnostics/createDiagnosticsInitConfig'; import { allAsync } from './evaluation/collection'; import { Flag } from './evaluation/data/Flag'; @@ -220,7 +221,7 @@ export default class LDClientImpl implements LDClient { }); const makeDefaultProcessor = () => config.stream - ? new internal.StreamingProcessor( + ? new StreamingProcessor( clientContext, '/all', [], diff --git a/packages/shared/sdk-server/src/Migration.ts b/packages/shared/sdk-server/src/Migration.ts index 2a5b2a037..e59ee5e54 100644 --- a/packages/shared/sdk-server/src/Migration.ts +++ b/packages/shared/sdk-server/src/Migration.ts @@ -96,8 +96,7 @@ class Migration< TMigrationWrite, TMigrationReadInput = any, TMigrationWriteInput = any, -> implements - LDMigration +> implements LDMigration { private readonly _execution: LDSerialExecution | LDConcurrentExecution; diff --git a/packages/shared/common/src/internal/stream/StreamingProcessor.ts b/packages/shared/sdk-server/src/data_sources/StreamingProcessor.ts similarity index 86% rename from packages/shared/common/src/internal/stream/StreamingProcessor.ts rename to packages/shared/sdk-server/src/data_sources/StreamingProcessor.ts index 5ffcc90cf..e752f4863 100644 --- a/packages/shared/common/src/internal/stream/StreamingProcessor.ts +++ b/packages/shared/sdk-server/src/data_sources/StreamingProcessor.ts @@ -1,19 +1,21 @@ import { + ClientContext, + DataSourceErrorKind, EventName, EventSource, + getStreamingUri, + httpErrorMessage, HttpErrorResponse, + internal, + LDHeaders, LDLogger, + LDStreamingError, ProcessStreamResponse, Requests, -} from '../../api'; -import { LDStreamProcessor } from '../../api/subsystem'; -import { DataSourceErrorKind } from '../../datasource/DataSourceErrorKinds'; -import { LDStreamingError } from '../../datasource/errors'; -import { ClientContext } from '../../options'; -import { getStreamingUri } from '../../options/ServiceEndpoints'; -import { httpErrorMessage, LDHeaders, shouldRetry } from '../../utils'; -import { DiagnosticsManager } from '../diagnostics'; -import { StreamingErrorHandler } from './types'; + shouldRetry, + StreamingErrorHandler, + subsystem, +} from '@launchdarkly/js-sdk-common'; const reportJsonError = ( type: string, @@ -28,8 +30,7 @@ const reportJsonError = ( ); }; -// TODO: SDK-156 - Move to Server SDK specific location -class StreamingProcessor implements LDStreamProcessor { +export default class StreamingProcessor implements subsystem.LDStreamProcessor { private readonly _headers: { [key: string]: string | string[] }; private readonly _streamUri: string; private readonly _logger?: LDLogger; @@ -44,7 +45,7 @@ class StreamingProcessor implements LDStreamProcessor { parameters: { key: string; value: string }[], private readonly _listeners: Map, baseHeaders: LDHeaders, - private readonly _diagnosticsManager?: DiagnosticsManager, + private readonly _diagnosticsManager?: internal.DiagnosticsManager, private readonly _errorHandler?: StreamingErrorHandler, private readonly _streamInitialReconnectDelay = 1, ) { @@ -167,5 +168,3 @@ class StreamingProcessor implements LDStreamProcessor { this.stop(); } } - -export default StreamingProcessor;