Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming pagination support (streamingRequest) #31

Merged
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"chai": "^4.2.0",
"chai-subset": "^1.6.0",
"deasync": "^0.1.14",
"flush-promises": "^1.0.2",
"mocha": "^8.2.1",
"sinon": "^7.3.1",
"sinon-chai": "^3.3.0",
Expand All @@ -64,8 +65,8 @@
"autobind-decorator": "^2.4.0",
"lodash.ismatch": "^4.4.0",
"lodash.pull": "^4.1.0",
"reconnecting-websocket": "^4.4.0",
"lodash.uniqby": "^4.7.0",
"reconnecting-websocket": "^4.4.0",
"winston": "^3.3.3"
}
}
96 changes: 86 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ISerializer,
IStreamingAPI,
ITransport,
StreamingRequestHandler,
SubscribeOptions,
SubscriptionHandler,
} from './interface';
Expand All @@ -35,6 +36,42 @@ interface ISubscriptionDescriptor<S, P extends S> {
unsubscribeMessage: object,
}

/**
* Promise representing the listening for responses during a streaming request, and offering an
* cancel() method to stop listening for additional responses.
*
* This is returned from DCRFClient.streamingRequest.
*/
export
class StreamingRequestPromise<T> extends Promise<T> {
protected _dispatcher: IDispatcher;
protected _listenerId: number | null;

constructor(executor: (resolve: (value?: (PromiseLike<T> | T)) => void, reject: (reason?: any) => void) => void,
dispatcher: IDispatcher, listenerId: number) {
super(executor);
this._dispatcher = dispatcher;
this._listenerId = listenerId;
}

public get listenerId() {
return this._listenerId;
}

/**
* Stop listening for new events on this subscription
* @return true if the subscription was active, false if it was already unsubscribed
*/
public async cancel(): Promise<boolean> {
if (this._listenerId !== null) {
const returnValue = this._dispatcher.cancel(this._listenerId);
this._listenerId = null;
return returnValue;
}
return false;
}
}

export
class DCRFClient implements IStreamingAPI {
public readonly dispatcher: IDispatcher;
Expand Down Expand Up @@ -266,6 +303,22 @@ class DCRFClient implements IStreamingAPI {
}
}

private sendRequest(payload: object, requestId: string, stream: string) {
payload = Object.assign({}, payload, {request_id: requestId});
if (this.options.preprocessPayload != null) {
// Note: this and the preprocessMessage handler below presume an object will be returned.
// If you really want to return a 0, you're kinda SOL -- wrap it in an object :P
payload = this.options.preprocessPayload(stream, payload, requestId) || payload;
}

let message = this.buildMultiplexedMessage(stream, payload);
if (this.options.preprocessMessage != null) {
message = this.options.preprocessMessage(message) || message;
}

this.send(message);
}

public request(stream: string, payload: object, requestId: string=UUID.generate()): Promise<any> {
return new Promise((resolve, reject) => {
const selector = this.buildRequestResponseSelector(stream, requestId);
Expand All @@ -281,21 +334,44 @@ class DCRFClient implements IStreamingAPI {
reject(response);
}
});
this.sendRequest(payload, requestId, stream);
});
}

payload = Object.assign({}, payload, {request_id: requestId});
if (this.options.preprocessPayload != null) {
// Note: this and the preprocessMessage handler below presume an object will be returned.
// If you really want to return a 0, you're kinda SOL -- wrap it in an object :P
payload = this.options.preprocessPayload(stream, payload, requestId) || payload;
}
public streamingRequest(stream: string, payload: object, callback: StreamingRequestHandler, requestId: string = UUID.generate()): StreamingRequestPromise<void> {
const selector = this.buildRequestResponseSelector(stream, requestId);

let message = this.buildMultiplexedMessage(stream, payload);
if (this.options.preprocessMessage != null) {
message = this.options.preprocessMessage(message) || message;
let cancelable: StreamingRequestPromise<void>;
let listenerId: number | null = this.dispatcher.listen(selector, (data: typeof selector & { payload: { response_status: number, data: any } }) => {
const {payload: response} = data;
const responseStatus = response.response_status;

if (!cancelable.listenerId) {
// we promise not to call callback after cancel.
return;
}

this.send(message);
// 2xx is success
if (Math.floor(responseStatus / 100) === 2) {
callback(null, response.data);
} else {
cancelable.cancel().finally(() => {
callback(response, null);
})

}
});

cancelable = new StreamingRequestPromise((resolve, reject) => {
try {
this.sendRequest(payload, requestId, stream);
resolve();
} catch (e) {
reject(e);
}
}, this.dispatcher, listenerId);

return cancelable;
}

public send(object: object) {
Expand Down
15 changes: 15 additions & 0 deletions src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type DispatchListener<T> = (response: T) => any;
export
type SubscriptionHandler = (payload: {[prop: string]: any}, action: string) => any;

export
type StreamingRequestHandler = (error: {response_status: number, data: any} | null, payload: {[prop: string]: any} | null) => any;


/**
* Calls all handlers whose selectors match an incoming payload.
Expand Down Expand Up @@ -406,6 +409,18 @@ interface IStreamingAPI {
* On failure, the promise will be rejected with the entire API response.
*/
request(stream: string, payload: object, requestId?: string): Promise<object>;

/**
* Perform an asynchronous transaction where the result can be broken into multiple responses
*
* @param stream Name of object's type stream
* @param payload Data to send as payload
* @param callback Function to call with payload on new responses
* @param requestId Value to send as request_id to the server. If not specified,
* one will be generated.
* @return StreamingRequestCanceler function to call when deciding there will be no more responses.
*/
streamingRequest(stream: string, payload: object, callback: StreamingRequestHandler, requestId?: string): CancelablePromise<void>;
}


Expand Down
116 changes: 113 additions & 3 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import chai, {expect} from 'chai';
import sinon from 'sinon';
import sinonChai from 'sinon-chai';
import {DCRFClient} from '../src';
import flushPromises from "flush-promises";
chai.use(sinonChai);

import FifoDispatcher from '../src/dispatchers/fifo';
Expand Down Expand Up @@ -183,9 +184,7 @@ describe('DCRFClient', function() {
});

expect(transport.send).to.have.been.calledOnce;
const msg = transport.send.getCall(0).args[0];
const stream = msg.stream;
const requestId = msg.payload.request_id;
const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

transport.emit('message', {
data: {
Expand Down Expand Up @@ -243,6 +242,117 @@ describe('DCRFClient', function() {
});
});

describe('streamingRequest', function() {
it('sends request and listen for responses until cancel', async function () {
const responses: any[] = [];
const cancelable = api.streamingRequest('test', {'key': 'unique'}, (error, response) => {
responses.push(response);
});

await cancelable;

expect(transport.send).to.have.been.calledOnce;
const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique2'}
}
}
});

expect(await cancelable.cancel()).to.be.true;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique3'}
}
}
});

expect(responses).to.deep.equal([{'response': 'unique'}, {'response': 'unique2'}]);
expect(await cancelable.cancel()).to.be.false;
});

it('cancels when receiving an error.', async function () {
const responses: any[] = [];
const errors: any[] = [];
const cancelable = api.streamingRequest('test', {'key': 'unique'}, (error, response) => {
if (error) {
errors.push(error);
} else {
responses.push(response);
}
});

await cancelable;

expect(transport.send).to.have.been.calledOnce;
const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 400,
data: {response: 'unique2'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique3'}
}
}
});

await flushPromises();

expect(responses).to.deep.equal([{'response': 'unique'}]);
expect(errors).to.deep.equal([{
request_id: requestId,
response_status: 400,
data: {response: 'unique2'}
}]);
expect(await cancelable.cancel()).to.be.false;
});
});

describe('subscribe', function() {
it('invokes callback on every update', function() {
Expand Down