-
Notifications
You must be signed in to change notification settings - Fork 650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KREST-2746 Reflect slow responses from Kafka back to the http client #1043
base: master
Are you sure you want to change the base?
KREST-2746 Reflect slow responses from Kafka back to the http client #1043
Conversation
@dimitarndimitrov The failing unit test I mentioned is testWriteToChunkedOutputAfterTimeout. (although the new 429 test fails in the same way too, I've not been testing that one) |
baaf2ea
to
d3ef908
Compare
@dimitarndimitrov ignore the first commit, I'd not tidied up properly. This one shows the hang behaviour from easymock where the close on the mappingIterator never returns: logs look like
|
and with the debugger running it looks like this, and passes the test. There is a difference (after I said there wasn't) :) the debug line writing to sink. This only writes out if I have a breakpoint in the async writing to sink method (eg line 471), otherwise I see the same behaviour with and without the debugger
So that's a nice big clue I'm going to go investigate :) |
700f956
to
3c4fe45
Compare
3c4fe45
to
9a95641
Compare
@dimitarndimitrov @AndrewJSchofield This PR could do with merging before mid-November if possible, so that we can deal with kafka back pressure before increasing (or removing) any byte based rate limits for produce. It would be great if I could get some feedback by the end of October. |
@@ -196,12 +216,24 @@ public final void resume(AsyncResponse asyncResponse) { | |||
CompletableFuture.completedFuture( | |||
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e)))); | |||
} finally { | |||
// if there are still outstanding response to send back, for example hasNext has returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the MAX_CLOSE_RETRIES
private void triggerDelayedClose( | ||
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { | ||
if (executorService == null) { | ||
executorService = Executors.newSingleThreadScheduledExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is null, so it's not an object reference that we can overwrite/edit at this point.
Need to eg return the executor service and set the original object to that value
@@ -347,14 +438,16 @@ private void asyncResume(AsyncResponse asyncResponse) { | |||
asyncResponse.resume(Response.ok(sink).build()); | |||
} | |||
|
|||
private volatile boolean sinkClosed = false; | |||
volatile boolean sinkClosed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this no longer private?
private volatile boolean sinkClosed = false; | ||
volatile boolean sinkClosed = false; | ||
|
||
private volatile AtomicInteger queueDepth = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be volatile
@@ -839,7 +841,7 @@ private static ProduceAction getProduceAction( | |||
replay(produceControllerProvider, produceController); | |||
|
|||
StreamingResponseFactory streamingResponseFactory = | |||
new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS); | |||
new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_MS, DEPTH, DEPTH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename these to be eg DURATION or GRACE_DURATION etc, need to go look up the values atm
// no third message | ||
expect(requestsMappingIterator.hasNext()).andReturn(false); | ||
|
||
requestsMappingIterator.close(); // call from thread executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't do an expect with something that returns a void, need to do the playback of the calls one by one :'(
Add a comment explaining this, because this looks bonkers.
expect(mockedChunkedOutput.isClosed()).andReturn(false); | ||
mockedChunkedOutput.write(sucessResult); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two c's in success.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reviewed the code with @ehumber and am happy to approve. I do not think this is a sensible way to handle streamed REST produce requests which overrun the KafkaProducer request limits, but this does at least prevent heap exhaustion when it happens. I think there's a significant refactor due in this code at some point.
Sorry I didn't manage to get this tidied up in time :(. I made a PR from a branch in confluentinc/kafka-rest, so if my original branch from my fork goes missing when I leave, hopefully you still have the code For this PR I recommend you don't merge it until you need to. That will most probably be when you look at rate limiting around REST produce, and potentially remove it in REST and rely on backpressure from Kafka to keep the requests at a sensible limit. |
This PR adds code to reflect Kafka rate limiting back to the http client.
While the produce api response from Kafka does indicate that the client is being throttled to the Kafka Produce java client, this information is not exposed to the end user of that client (ie Kafka REST), so we have to infer the throttling another way.
Kafka throttles by delaying its response back to the client. So we can assume that if we are getting a growing backlog of requests waiting to be sent to kafka then kafka is throttling REST.
When this happens (or we have a backlog of calls to Kafka for some other reason) we are obliged to send responses back to kafka in the same order as the requests arrived with us, so all we can do is add 429 responses, once we hit the "throttle from this point "queue depth, to the end of the response queue. The requests for these are not sent to kafka, and so should reduce the traffic reaching kafka, possibly enough to bring it back under the throttle limit.
If the queue depth doesn't shrink sufficiently, and instead grows to the max limit, then the connection is chopped after the grace period in my previous (disconnect clients nicely) PR has expired.