Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming pagination support (streamingRequest) #31

Merged
56 changes: 45 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
ISerializer,
IStreamingAPI,
ITransport,
RequestMultipleCancel,
RequestMultipleHandler,
SubscribeOptions,
SubscriptionHandler,
} from './interface';
Expand Down Expand Up @@ -266,6 +268,22 @@ class DCRFClient implements IStreamingAPI {
}
}

private sendRequest(payload: object, requestId: string, stream: string) {
payload = Object.assign({}, payload, {request_id: requestId});
if (this.options.preprocessPayload != null) {
// Note: this and the preprocessMessage handler below presume an object will be returned.
// If you really want to return a 0, you're kinda SOL -- wrap it in an object :P
payload = this.options.preprocessPayload(stream, payload, requestId) || payload;
}

let message = this.buildMultiplexedMessage(stream, payload);
if (this.options.preprocessMessage != null) {
message = this.options.preprocessMessage(message) || message;
}

this.send(message);
}

public request(stream: string, payload: object, requestId: string=UUID.generate()): Promise<any> {
return new Promise((resolve, reject) => {
const selector = this.buildRequestResponseSelector(stream, requestId);
Expand All @@ -281,21 +299,37 @@ class DCRFClient implements IStreamingAPI {
reject(response);
}
});
this.sendRequest(payload, requestId, stream);
});
}

payload = Object.assign({}, payload, {request_id: requestId});
if (this.options.preprocessPayload != null) {
// Note: this and the preprocessMessage handler below presume an object will be returned.
// If you really want to return a 0, you're kinda SOL -- wrap it in an object :P
payload = this.options.preprocessPayload(stream, payload, requestId) || payload;
}
public requestMultiple(stream: string, payload: object, callback: RequestMultipleHandler, requestId: string = UUID.generate()): RequestMultipleCancel {
const selector = this.buildRequestResponseSelector(stream, requestId);

let message = this.buildMultiplexedMessage(stream, payload);
if (this.options.preprocessMessage != null) {
message = this.options.preprocessMessage(message) || message;
}
let listenerId: number | null = this.dispatcher.listen(selector, (data: typeof selector & { payload: { response_status: number, data: any } }) => {
const {payload: response} = data;
const responseStatus = response.response_status;

this.send(message);
// 2xx is success
if (Math.floor(responseStatus / 100) === 2) {
callback(null, response.data);
} else {
if (listenerId) {
this.dispatcher.cancel(listenerId);
listenerId = null;
}
callback(response, null);
}
});

this.sendRequest(payload, requestId, stream);

return () => {
if (listenerId) {
this.dispatcher.cancel(listenerId);
listenerId = null;
}
};
}

public send(object: object) {
Expand Down
7 changes: 7 additions & 0 deletions src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type DispatchListener<T> = (response: T) => any;
export
type SubscriptionHandler = (payload: {[prop: string]: any}, action: string) => any;

export
type RequestMultipleHandler = (error: {response_status: number, data: any} | null, payload: {[prop: string]: any} | null) => any;

export
type RequestMultipleCancel = () => void;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd pair better with Handler to have this be RequestMultipleCanceler. And I suppose the object being described is not technically a "cancel", but a thing that causes cancellation, so a "canceler".

Also, in learning from previous mistakes, it may be wise to have this return a Promise which, in the future, could have the ability to send a "stop" request to the server before resolving. It definitely makes sense to immediately (i.e. not on the next tick) remove the listener, though, to guarantee that once cancel() is called, callback will not be invoked again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a StreamingRequestPromise to replace StreamingRequestCanceler as the return type for DCRFClient#streamingRequest in 88d1b53.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In b90ec59, I'm being more certain that callback is not called after cancel;


Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm, actually, do you think streamingRequest makes more sense than requestMultiple? I'm concerned about the ambiguity of requestMultiple — like, does it make multiple requests? Is it requesting multiple objects? Or is it what it actually does: make a single request for multiple responses?

Perhaps DCRFClient.streamingRequest() with StreamingRequestHandler and StreamingRequestCanceler? (I was toying with DCRFClient.requestStreaming(), but I'm leaning toward streamingRequest.) What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this name change in 635369f.

/**
* Calls all handlers whose selectors match an incoming payload.
Expand Down Expand Up @@ -406,6 +411,8 @@ interface IStreamingAPI {
* On failure, the promise will be rejected with the entire API response.
*/
request(stream: string, payload: object, requestId?: string): Promise<object>;

requestMultiple(stream: string, payload: object, callback:RequestMultipleHandler, requestId?: string): RequestMultipleCancel;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spacing

Suggested change
requestMultiple(stream: string, payload: object, callback:RequestMultipleHandler, requestId?: string): RequestMultipleCancel;
requestMultiple(stream: string, payload: object, callback: RequestMultipleHandler, requestId?: string): RequestMultipleCancel;

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some JSDocs for the method before this gets released, but that can wait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some JSDocs for this in 635369f.

}


Expand Down
108 changes: 108 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,114 @@ describe('DCRFClient', function() {
});
});

describe('requestMultiple', function() {
it('sends request and listen for responses until cancel', function () {
const responses: any[] = [];
const cancel = api.requestMultiple('test', {'key': 'unique'}, (error, response) => {
responses.push(response);
});

expect(transport.send).to.have.been.calledOnce;
const msg = transport.send.getCall(0).args[0];
const stream = msg.stream;
const requestId = msg.payload.request_id;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has nothing to do with this PR; I'm just now realizing how I could've done all of these three lines much more succinctly with a single destructuring assignment. This implementation I went with is so... Pythonic :P

If I wrote these tests again, I'd do it like this:

const [{stream, payload: {request_id: requestId}}] = transport.send.firstCall.args;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this refactor in 635369f.


transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique2'}
}
}
});

cancel();

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique3'}
}
}
});

expect(responses).to.deep.equal([{'response': 'unique'}, {'response': 'unique2'}]);
});

it('cancels when receiving an error.', function () {
const responses: any[] = [];
const errors: any[] = [];
const cancel = api.requestMultiple('test', {'key': 'unique'}, (error, response) => {
if (error) {
errors.push(error);
} else {
responses.push(response);
}
});

expect(transport.send).to.have.been.calledOnce;
const msg = transport.send.getCall(0).args[0];
const stream = msg.stream;
const requestId = msg.payload.request_id;

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 400,
data: {response: 'unique2'}
}
}
});

transport.emit('message', {
data: {
stream,
payload: {
request_id: requestId,
response_status: 200,
data: {response: 'unique3'}
}
}
});

expect(responses).to.deep.equal([{'response': 'unique'}]);
expect(errors).to.deep.equal([{
request_id: requestId,
response_status: 400,
data: {response: 'unique2'}
}]);
expect(cancel).to.not.throw();
});
});

describe('subscribe', function() {
it('invokes callback on every update', function() {
Expand Down