Skip to content

Commit

Permalink
chore: moving internal StreamingProcessor class from js-sdk-common to…
Browse files Browse the repository at this point in the history
… js-server-sdk-common (#700)

**Requirements**

- [ ] I have added test coverage for new or changed functionality
No behavioral changes, existing tests migrated with class.

- [x] I have followed the repository's [pull request submission
guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests)

- [ ] I have validated my changes against all supported platform
versions

**Related issues**

SDK-156, prepping for SDK-849
  • Loading branch information
tanderson-ld authored Dec 4, 2024
1 parent 8351aca commit 7266baa
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 42 deletions.
2 changes: 2 additions & 0 deletions packages/shared/common/src/datasource/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ export class LDStreamingError extends Error {
this.recoverable = recoverable;
}
}

export type StreamingErrorHandler = (err: LDStreamingError) => void;
15 changes: 13 additions & 2 deletions packages/shared/common/src/datasource/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import { DataSourceErrorKind } from './DataSourceErrorKinds';
import { LDFileDataSourceError, LDPollingError, LDStreamingError } from './errors';
import {
LDFileDataSourceError,
LDPollingError,
LDStreamingError,
StreamingErrorHandler,
} from './errors';

export { DataSourceErrorKind, LDFileDataSourceError, LDPollingError, LDStreamingError };
export {
DataSourceErrorKind,
LDFileDataSourceError,
LDPollingError,
LDStreamingError,
StreamingErrorHandler,
};
2 changes: 2 additions & 0 deletions packages/shared/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
LDFileDataSourceError,
LDPollingError,
LDStreamingError,
StreamingErrorHandler,
} from './datasource';

export * from './api';
Expand All @@ -24,5 +25,6 @@ export {
DataSourceErrorKind,
LDPollingError,
LDStreamingError,
StreamingErrorHandler,
LDFileDataSourceError,
};
1 change: 0 additions & 1 deletion packages/shared/common/src/internal/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ export * from './context';
export * from './diagnostics';
export * from './evaluation';
export * from './events';
export * from './stream';
5 changes: 0 additions & 5 deletions packages/shared/common/src/internal/stream/index.ts

This file was deleted.

3 changes: 0 additions & 3 deletions packages/shared/common/src/internal/stream/types.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ProcessStreamResponse,
Requests,
shouldRetry,
StreamingErrorHandler,
subsystem,
} from '@launchdarkly/js-sdk-common';

Expand All @@ -23,7 +24,7 @@ const reportJsonError = (
type: string,
data: string,
logger?: LDLogger,
errorHandler?: internal.StreamingErrorHandler,
errorHandler?: StreamingErrorHandler,
) => {
logger?.error(`Stream received invalid data in "${type}" message`);
logger?.debug(`Invalid JSON follows: ${data}`);
Expand All @@ -47,7 +48,7 @@ class StreamingProcessor implements subsystem.LDStreamProcessor {
encoding: Encoding,
private readonly _pollingRequestor: Requestor,
private readonly _diagnosticsManager?: internal.DiagnosticsManager,
private readonly _errorHandler?: internal.StreamingErrorHandler,
private readonly _errorHandler?: StreamingErrorHandler,
private readonly _logger?: LDLogger,
) {
let path: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { EventName, Info, LDLogger, ProcessStreamResponse } from '../../../src/api';
import { LDStreamProcessor } from '../../../src/api/subsystem';
import { DataSourceErrorKind } from '../../../src/datasource/DataSourceErrorKinds';
import { LDStreamingError } from '../../../src/datasource/errors';
import { DiagnosticsManager } from '../../../src/internal/diagnostics';
import StreamingProcessor from '../../../src/internal/stream/StreamingProcessor';
import { defaultHeaders } from '../../../src/utils';
import { createBasicPlatform } from '../../createBasicPlatform';
import {
DataSourceErrorKind,
defaultHeaders,
EventName,
Info,
internal,
LDLogger,
LDStreamingError,
ProcessStreamResponse,
subsystem,
} from '@launchdarkly/js-sdk-common';

import StreamingProcessor from '../../src/data_sources/StreamingProcessor';
import { createBasicPlatform } from '../createBasicPlatform';

let logger: LDLogger;

Expand Down Expand Up @@ -61,8 +67,8 @@ const createMockEventSource = (streamUri: string = '', options: any = {}) => ({

describe('given a stream processor with mock event source', () => {
let info: Info;
let streamingProcessor: LDStreamProcessor;
let diagnosticsManager: DiagnosticsManager;
let streamingProcessor: subsystem.LDStreamProcessor;
let diagnosticsManager: internal.DiagnosticsManager;
let listeners: Map<EventName, ProcessStreamResponse>;
let mockEventSource: any;
let mockListener: ProcessStreamResponse;
Expand Down Expand Up @@ -104,7 +110,7 @@ describe('given a stream processor with mock event source', () => {
listeners.set('put', mockListener);
listeners.set('patch', mockListener);

diagnosticsManager = new DiagnosticsManager(sdkKey, basicPlatform, {});
diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {});
streamingProcessor = new StreamingProcessor(
{
basicConfiguration: getBasicConfiguration(logger),
Expand Down
3 changes: 2 additions & 1 deletion packages/shared/sdk-server/__tests__/streamingProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
LDHeaders,
LDStreamingError,
ProcessStreamResponse,
StreamingErrorHandler,
} from '@launchdarkly/js-sdk-common';

export const MockStreamingProcessor = jest.fn();
Expand All @@ -29,7 +30,7 @@ export const setupMockStreamingProcessor = (
listeners: Map<EventName, ProcessStreamResponse>,
baseHeaders: LDHeaders,
diagnosticsManager: internal.DiagnosticsManager,
errorHandler: internal.StreamingErrorHandler,
errorHandler: StreamingErrorHandler,
_streamInitialReconnectDelay: number,
) => ({
start: jest.fn(async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/shared/sdk-server/src/LDClientImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { createStreamListeners } from './data_sources/createStreamListeners';
import DataSourceUpdates from './data_sources/DataSourceUpdates';
import PollingProcessor from './data_sources/PollingProcessor';
import Requestor from './data_sources/Requestor';
import StreamingProcessor from './data_sources/StreamingProcessor';
import createDiagnosticsInitConfig from './diagnostics/createDiagnosticsInitConfig';
import { allAsync } from './evaluation/collection';
import { Flag } from './evaluation/data/Flag';
Expand Down Expand Up @@ -220,7 +221,7 @@ export default class LDClientImpl implements LDClient {
});
const makeDefaultProcessor = () =>
config.stream
? new internal.StreamingProcessor(
? new StreamingProcessor(
clientContext,
'/all',
[],
Expand Down
3 changes: 1 addition & 2 deletions packages/shared/sdk-server/src/Migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class Migration<
TMigrationWrite,
TMigrationReadInput = any,
TMigrationWriteInput = any,
> implements
LDMigration<TMigrationRead, TMigrationWrite, TMigrationReadInput, TMigrationWriteInput>
> implements LDMigration<TMigrationRead, TMigrationWrite, TMigrationReadInput, TMigrationWriteInput>
{
private readonly _execution: LDSerialExecution | LDConcurrentExecution;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import {
ClientContext,
DataSourceErrorKind,
EventName,
EventSource,
getStreamingUri,
httpErrorMessage,
HttpErrorResponse,
internal,
LDHeaders,
LDLogger,
LDStreamingError,
ProcessStreamResponse,
Requests,
} from '../../api';
import { LDStreamProcessor } from '../../api/subsystem';
import { DataSourceErrorKind } from '../../datasource/DataSourceErrorKinds';
import { LDStreamingError } from '../../datasource/errors';
import { ClientContext } from '../../options';
import { getStreamingUri } from '../../options/ServiceEndpoints';
import { httpErrorMessage, LDHeaders, shouldRetry } from '../../utils';
import { DiagnosticsManager } from '../diagnostics';
import { StreamingErrorHandler } from './types';
shouldRetry,
StreamingErrorHandler,
subsystem,
} from '@launchdarkly/js-sdk-common';

const reportJsonError = (
type: string,
Expand All @@ -28,8 +30,7 @@ const reportJsonError = (
);
};

// TODO: SDK-156 - Move to Server SDK specific location
class StreamingProcessor implements LDStreamProcessor {
export default class StreamingProcessor implements subsystem.LDStreamProcessor {
private readonly _headers: { [key: string]: string | string[] };
private readonly _streamUri: string;
private readonly _logger?: LDLogger;
Expand All @@ -44,7 +45,7 @@ class StreamingProcessor implements LDStreamProcessor {
parameters: { key: string; value: string }[],
private readonly _listeners: Map<EventName, ProcessStreamResponse>,
baseHeaders: LDHeaders,
private readonly _diagnosticsManager?: DiagnosticsManager,
private readonly _diagnosticsManager?: internal.DiagnosticsManager,
private readonly _errorHandler?: StreamingErrorHandler,
private readonly _streamInitialReconnectDelay = 1,
) {
Expand Down Expand Up @@ -167,5 +168,3 @@ class StreamingProcessor implements LDStreamProcessor {
this.stop();
}
}

export default StreamingProcessor;

0 comments on commit 7266baa

Please sign in to comment.