Skip to content

Commit

Permalink
Merge pull request #31 from arrai-innovations/streaming-pagination-su…
Browse files Browse the repository at this point in the history
…pport
  • Loading branch information
theY4Kman authored Sep 24, 2021
2 parents 67cdd14 + b90ec59 commit 8aee9f6
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 14 deletions.
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"chai": "^4.2.0",
"chai-subset": "^1.6.0",
"deasync": "^0.1.14",
"flush-promises": "^1.0.2",
"mocha": "^8.2.1",
"sinon": "^7.3.1",
"sinon-chai": "^3.3.0",
Expand All @@ -64,8 +65,8 @@
"autobind-decorator": "^2.4.0",
"lodash.ismatch": "^4.4.0",
"lodash.pull": "^4.1.0",
"reconnecting-websocket": "^4.4.0",
"lodash.uniqby": "^4.7.0",
"reconnecting-websocket": "^4.4.0",
"winston": "^3.3.3"
}
}
96 changes: 86 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ISerializer,
IStreamingAPI,
ITransport,
StreamingRequestHandler,
SubscribeOptions,
SubscriptionHandler,
} from './interface';
Expand All @@ -35,6 +36,42 @@ interface ISubscriptionDescriptor<S, P extends S> {
unsubscribeMessage: object,
}

/**
* Promise representing the listening for responses during a streaming request, and offering an
* cancel() method to stop listening for additional responses.
*
* This is returned from DCRFClient.streamingRequest.
*/
export
class StreamingRequestPromise<T> extends Promise<T> {
protected _dispatcher: IDispatcher;
protected _listenerId: number | null;

constructor(executor: (resolve: (value?: (PromiseLike<T> | T)) => void, reject: (reason?: any) => void) => void,
dispatcher: IDispatcher, listenerId: number) {
super(executor);
this._dispatcher = dispatcher;
this._listenerId = listenerId;
}

public get listenerId() {
return this._listenerId;
}

/**
* Stop listening for new events on this subscription
* @return true if the subscription was active, false if it was already unsubscribed
*/
public async cancel(): Promise<boolean> {
if (this._listenerId !== null) {
const returnValue = this._dispatcher.cancel(this._listenerId);
this._listenerId = null;
return returnValue;
}
return false;
}
}

export
class DCRFClient implements IStreamingAPI {
public readonly dispatcher: IDispatcher;
Expand Down Expand Up @@ -266,6 +303,22 @@ class DCRFClient implements IStreamingAPI {
}
}

private sendRequest(payload: object, requestId: string, stream: string) {
payload = Object.assign({}, payload, {request_id: requestId});
if (this.options.preprocessPayload != null) {
// Note: this and the preprocessMessage handler below presume an object will be returned.
// If you really want to return a 0, you're kinda SOL -- wrap it in an object :P
payload = this.options.preprocessPayload(stream, payload, requestId) || payload;
}

let message = this.buildMultiplexedMessage(stream, payload);
if (this.options.preprocessMessage != null) {
message = this.options.preprocessMessage(message) || message;
}

this.send(message);
}

public request(stream: string, payload: object, requestId: string=UUID.generate()): Promise<any> {
return new Promise((resolve, reject) => {
const selector = this.buildRequestResponseSelector(stream, requestId);
Expand All @@ -281,21 +334,44 @@ class DCRFClient implements IStreamingAPI {
reject(response);
}
});
this.sendRequest(payload, requestId, stream);
});
}

payload = Object.assign({}, payload, {request_id: requestId});
if (this.options.preprocessPayload != null) {
// Note: this and the preprocessMessage handler below presume an object will be returned.
// If you really want to return a 0, you're kinda SOL -- wrap it in an object :P
payload = this.options.preprocessPayload(stream, payload, requestId) || payload;
}
public streamingRequest(stream: string, payload: object, callback: StreamingRequestHandler, requestId: string = UUID.generate()): StreamingRequestPromise<void> {
const selector = this.buildRequestResponseSelector(stream, requestId);

let message = this.buildMultiplexedMessage(stream, payload);
if (this.options.preprocessMessage != null) {
message = this.options.preprocessMessage(message) || message;
let cancelable: StreamingRequestPromise<void>;
let listenerId: number | null = this.dispatcher.listen(selector, (data: typeof selector & { payload: { response_status: number, data: any } }) => {
const {payload: response} = data;
const responseStatus = response.response_status;

if (!cancelable.listenerId) {
// we promise not to call callback after cancel.
return;
}

this.send(message);
// 2xx is success
if (Math.floor(responseStatus / 100) === 2) {
callback(null, response.data);
} else {
cancelable.cancel().finally(() => {
callback(response, null);
})

}
});

cancelable = new StreamingRequestPromise((resolve, reject) => {
try {
this.sendRequest(payload, requestId, stream);
resolve();
} catch (e) {
reject(e);
}
}, this.dispatcher, listenerId);

return cancelable;
}

public send(object: object) {
Expand Down
15 changes: 15 additions & 0 deletions src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type DispatchListener<T> = (response: T) => any;
export
type SubscriptionHandler = (payload: {[prop: string]: any}, action: string) => any;

export
type StreamingRequestHandler = (error: {response_status: number, data: any} | null, payload: {[prop: string]: any} | null) => any;


/**
* Calls all handlers whose selectors match an incoming payload.
Expand Down Expand Up @@ -406,6 +409,18 @@ interface IStreamingAPI {
* On failure, the promise will be rejected with the entire API response.
*/
request(stream: string, payload: object, requestId?: string): Promise<object>;

/**
* Perform an asynchronous transaction where the result can be broken into multiple responses
*
* @param stream Name of object's type stream
* @param payload Data to send as payload
* @param callback Function to call with payload on new responses
* @param requestId Value to send as request_id to the server. If not specified,
* one will be generated.
* @return StreamingRequestCanceler function to call when deciding there will be no more responses.
*/
streamingRequest(stream: string, payload: object, callback: StreamingRequestHandler, requestId?: string): CancelablePromise<void>;
}


Expand Down
116 changes: 113 additions & 3 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import chai, {expect} from 'chai';
import sinon from 'sinon';
import sinonChai from 'sinon-chai';
import {DCRFClient} from '../src';
import flushPromises from "flush-promises";
chai.use(sinonChai);

import FifoDispatcher from '../src/dispatchers/fifo';
Expand Down Expand Up @@ -183,9 +184,7 @@ describe('DCRFClient', function() {
});

expect(transport.send).to.have.been.calledOnce;
const msg = transport.send.getCall(0).args[0];
const stream = msg.stream;
const requestId = msg.payload.request_id;
const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

transport.emit('message', {
data: {
Expand Down Expand Up @@ -243,6 +242,117 @@ describe('DCRFClient', function() {
});
});

describe('streamingRequest', function() {
it('sends request and listen for responses until cancel', async function () {
const responses: any[] = [];
const cancelable = api.streamingRequest('test', {'key': 'unique'}, (error, response) => {
responses.push(response);
});

await cancelable;

expect(transport.send).to.have.been.calledOnce;
const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique2'}
}
}
});

expect(await cancelable.cancel()).to.be.true;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique3'}
}
}
});

expect(responses).to.deep.equal([{'response': 'unique'}, {'response': 'unique2'}]);
expect(await cancelable.cancel()).to.be.false;
});

it('cancels when receiving an error.', async function () {
const responses: any[] = [];
const errors: any[] = [];
const cancelable = api.streamingRequest('test', {'key': 'unique'}, (error, response) => {
if (error) {
errors.push(error);
} else {
responses.push(response);
}
});

await cancelable;

expect(transport.send).to.have.been.calledOnce;
const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 400,
data: {response: 'unique2'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique3'}
}
}
});

await flushPromises();

expect(responses).to.deep.equal([{'response': 'unique'}]);
expect(errors).to.deep.equal([{
request_id: requestId,
response_status: 400,
data: {response: 'unique2'}
}]);
expect(await cancelable.cancel()).to.be.false;
});
});

describe('subscribe', function() {
it('invokes callback on every update', function() {
Expand Down

0 comments on commit 8aee9f6

Please sign in to comment.