diff --git a/package-lock.json b/package-lock.json index e8aa304..0a1a42e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -33,6 +33,7 @@ "chai": "^4.2.0", "chai-subset": "^1.6.0", "deasync": "^0.1.14", + "flush-promises": "^1.0.2", "mocha": "^8.2.1", "sinon": "^7.3.1", "sinon-chai": "^3.3.0", @@ -1117,6 +1118,12 @@ "flat": "cli.js" } }, + "node_modules/flush-promises": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/flush-promises/-/flush-promises-1.0.2.tgz", + "integrity": "sha512-G0sYfLQERwKz4+4iOZYQEZVpOt9zQrlItIxQAAYAWpfby3gbHrx0osCHz5RLl/XoXevXk0xoN4hDFky/VV9TrA==", + "dev": true + }, "node_modules/fn.name": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/fn.name/-/fn.name-1.1.0.tgz", @@ -3645,6 +3652,12 @@ "integrity": "sha512-b6suED+5/3rTpUBdG1gupIl8MPFCAMA0QXwmljLhvCUKcUvdE4gWky9zpuGCcXHOsz4J9wPGNWq6OKpmIzz3hQ==", "dev": true }, + "flush-promises": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/flush-promises/-/flush-promises-1.0.2.tgz", + "integrity": "sha512-G0sYfLQERwKz4+4iOZYQEZVpOt9zQrlItIxQAAYAWpfby3gbHrx0osCHz5RLl/XoXevXk0xoN4hDFky/VV9TrA==", + "dev": true + }, "fn.name": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/fn.name/-/fn.name-1.1.0.tgz", diff --git a/package.json b/package.json index eed905e..7e2881c 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ "chai": "^4.2.0", "chai-subset": "^1.6.0", "deasync": "^0.1.14", + "flush-promises": "^1.0.2", "mocha": "^8.2.1", "sinon": "^7.3.1", "sinon-chai": "^3.3.0", @@ -64,8 +65,8 @@ "autobind-decorator": "^2.4.0", "lodash.ismatch": "^4.4.0", "lodash.pull": "^4.1.0", - "reconnecting-websocket": "^4.4.0", "lodash.uniqby": "^4.7.0", + "reconnecting-websocket": "^4.4.0", "winston": "^3.3.3" } } diff --git a/src/index.ts b/src/index.ts index 4e598e3..27f9b42 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,6 +13,7 @@ import { ISerializer, IStreamingAPI, ITransport, + StreamingRequestHandler, SubscribeOptions, SubscriptionHandler, } from './interface'; @@ -35,6 +36,42 @@ interface ISubscriptionDescriptor { unsubscribeMessage: object, } +/** + * Promise representing the listening for responses during a streaming request, and offering an + * cancel() method to stop listening for additional responses. + * + * This is returned from DCRFClient.streamingRequest. + */ +export +class StreamingRequestPromise extends Promise { + protected _dispatcher: IDispatcher; + protected _listenerId: number | null; + + constructor(executor: (resolve: (value?: (PromiseLike | T)) => void, reject: (reason?: any) => void) => void, + dispatcher: IDispatcher, listenerId: number) { + super(executor); + this._dispatcher = dispatcher; + this._listenerId = listenerId; + } + + public get listenerId() { + return this._listenerId; + } + + /** + * Stop listening for new events on this subscription + * @return true if the subscription was active, false if it was already unsubscribed + */ + public async cancel(): Promise { + if (this._listenerId !== null) { + const returnValue = this._dispatcher.cancel(this._listenerId); + this._listenerId = null; + return returnValue; + } + return false; + } +} + export class DCRFClient implements IStreamingAPI { public readonly dispatcher: IDispatcher; @@ -266,6 +303,22 @@ class DCRFClient implements IStreamingAPI { } } + private sendRequest(payload: object, requestId: string, stream: string) { + payload = Object.assign({}, payload, {request_id: requestId}); + if (this.options.preprocessPayload != null) { + // Note: this and the preprocessMessage handler below presume an object will be returned. + // If you really want to return a 0, you're kinda SOL -- wrap it in an object :P + payload = this.options.preprocessPayload(stream, payload, requestId) || payload; + } + + let message = this.buildMultiplexedMessage(stream, payload); + if (this.options.preprocessMessage != null) { + message = this.options.preprocessMessage(message) || message; + } + + this.send(message); + } + public request(stream: string, payload: object, requestId: string=UUID.generate()): Promise { return new Promise((resolve, reject) => { const selector = this.buildRequestResponseSelector(stream, requestId); @@ -281,21 +334,44 @@ class DCRFClient implements IStreamingAPI { reject(response); } }); + this.sendRequest(payload, requestId, stream); + }); + } - payload = Object.assign({}, payload, {request_id: requestId}); - if (this.options.preprocessPayload != null) { - // Note: this and the preprocessMessage handler below presume an object will be returned. - // If you really want to return a 0, you're kinda SOL -- wrap it in an object :P - payload = this.options.preprocessPayload(stream, payload, requestId) || payload; - } + public streamingRequest(stream: string, payload: object, callback: StreamingRequestHandler, requestId: string = UUID.generate()): StreamingRequestPromise { + const selector = this.buildRequestResponseSelector(stream, requestId); - let message = this.buildMultiplexedMessage(stream, payload); - if (this.options.preprocessMessage != null) { - message = this.options.preprocessMessage(message) || message; + let cancelable: StreamingRequestPromise; + let listenerId: number | null = this.dispatcher.listen(selector, (data: typeof selector & { payload: { response_status: number, data: any } }) => { + const {payload: response} = data; + const responseStatus = response.response_status; + + if (!cancelable.listenerId) { + // we promise not to call callback after cancel. + return; } - this.send(message); + // 2xx is success + if (Math.floor(responseStatus / 100) === 2) { + callback(null, response.data); + } else { + cancelable.cancel().finally(() => { + callback(response, null); + }) + + } }); + + cancelable = new StreamingRequestPromise((resolve, reject) => { + try { + this.sendRequest(payload, requestId, stream); + resolve(); + } catch (e) { + reject(e); + } + }, this.dispatcher, listenerId); + + return cancelable; } public send(object: object) { diff --git a/src/interface.ts b/src/interface.ts index dc37e04..b20c7a3 100644 --- a/src/interface.ts +++ b/src/interface.ts @@ -11,6 +11,9 @@ type DispatchListener = (response: T) => any; export type SubscriptionHandler = (payload: {[prop: string]: any}, action: string) => any; +export +type StreamingRequestHandler = (error: {response_status: number, data: any} | null, payload: {[prop: string]: any} | null) => any; + /** * Calls all handlers whose selectors match an incoming payload. @@ -406,6 +409,18 @@ interface IStreamingAPI { * On failure, the promise will be rejected with the entire API response. */ request(stream: string, payload: object, requestId?: string): Promise; + + /** + * Perform an asynchronous transaction where the result can be broken into multiple responses + * + * @param stream Name of object's type stream + * @param payload Data to send as payload + * @param callback Function to call with payload on new responses + * @param requestId Value to send as request_id to the server. If not specified, + * one will be generated. + * @return StreamingRequestCanceler function to call when deciding there will be no more responses. + */ + streamingRequest(stream: string, payload: object, callback: StreamingRequestHandler, requestId?: string): CancelablePromise; } diff --git a/test/test.ts b/test/test.ts index 5987720..95b6e0b 100644 --- a/test/test.ts +++ b/test/test.ts @@ -4,6 +4,7 @@ import chai, {expect} from 'chai'; import sinon from 'sinon'; import sinonChai from 'sinon-chai'; import {DCRFClient} from '../src'; +import flushPromises from "flush-promises"; chai.use(sinonChai); import FifoDispatcher from '../src/dispatchers/fifo'; @@ -183,9 +184,7 @@ describe('DCRFClient', function() { }); expect(transport.send).to.have.been.calledOnce; - const msg = transport.send.getCall(0).args[0]; - const stream = msg.stream; - const requestId = msg.payload.request_id; + const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args; transport.emit('message', { data: { @@ -243,6 +242,117 @@ describe('DCRFClient', function() { }); }); + describe('streamingRequest', function() { + it('sends request and listen for responses until cancel', async function () { + const responses: any[] = []; + const cancelable = api.streamingRequest('test', {'key': 'unique'}, (error, response) => { + responses.push(response); + }); + + await cancelable; + + expect(transport.send).to.have.been.calledOnce; + const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args; + + transport.emit('message', { + data: { + stream, + payload: { + request_id: requestId, + response_status: 200, + data: {response: 'unique'} + } + } + }); + + transport.emit('message', { + data: { + stream, + payload: { + request_id: requestId, + response_status: 200, + data: {response: 'unique2'} + } + } + }); + + expect(await cancelable.cancel()).to.be.true; + + transport.emit('message', { + data: { + stream, + payload: { + request_id: requestId, + response_status: 200, + data: {response: 'unique3'} + } + } + }); + + expect(responses).to.deep.equal([{'response': 'unique'}, {'response': 'unique2'}]); + expect(await cancelable.cancel()).to.be.false; + }); + + it('cancels when receiving an error.', async function () { + const responses: any[] = []; + const errors: any[] = []; + const cancelable = api.streamingRequest('test', {'key': 'unique'}, (error, response) => { + if (error) { + errors.push(error); + } else { + responses.push(response); + } + }); + + await cancelable; + + expect(transport.send).to.have.been.calledOnce; + const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args; + + transport.emit('message', { + data: { + stream, + payload: { + request_id: requestId, + response_status: 200, + data: {response: 'unique'} + } + } + }); + + transport.emit('message', { + data: { + stream, + payload: { + request_id: requestId, + response_status: 400, + data: {response: 'unique2'} + } + } + }); + + transport.emit('message', { + data: { + stream, + payload: { + request_id: requestId, + response_status: 200, + data: {response: 'unique3'} + } + } + }); + + await flushPromises(); + + expect(responses).to.deep.equal([{'response': 'unique'}]); + expect(errors).to.deep.equal([{ + request_id: requestId, + response_status: 400, + data: {response: 'unique2'} + }]); + expect(await cancelable.cancel()).to.be.false; + }); + }); describe('subscribe', function() { it('invokes callback on every update', function() {