Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Oct 25, 2024
1 parent fb8573a commit 93c5b31
Showing 1 changed file with 18 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ static GrpcGetDataStream create(
processHeartbeatResponses);
}

private static WindmillStreamShutdownException shutdownException(QueuedBatch batch) {
return new WindmillStreamShutdownException(
"Stream was closed when attempting to send " + batch.requestsCount() + " requests.");
}

private static WindmillStreamShutdownException shutdownException(QueuedRequest request) {
return new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");
}

@Override
protected synchronized void onNewStream() {
if (isShutdown()) {
Expand Down Expand Up @@ -349,34 +359,21 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon
"Cannot send request=[" + request + "] on closed stream.");
}

private void handleShutdown(QueuedRequest request, Throwable... causes) {
private void handleShutdown(QueuedRequest request, Throwable cause) {
if (isShutdown()) {
WindmillStreamShutdownException shutdownException =
new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");

for (Throwable cause : causes) {
shutdownException.addSuppressed(cause);
}

WindmillStreamShutdownException shutdownException = shutdownException(request);
shutdownException.addSuppressed(cause);
throw shutdownException;
}
}

private void handleShutdown(QueuedBatch batch) {
if (isShutdown()) {
throw new WindmillStreamShutdownException(
"Stream was closed when attempting to send " + batch.requestsCount() + " requests.");
}
}

private void queueRequestAndWait(QueuedRequest request) throws InterruptedException {
QueuedBatch batch;
boolean responsibleForSend = false;
@Nullable QueuedBatch prevBatch = null;
synchronized (shutdownLock) {
if (isShutdown()) {
handleShutdown(request);
throw shutdownException(request);
}

batch = batches.isEmpty() ? null : batches.getLast();
Expand Down Expand Up @@ -405,7 +402,7 @@ private void queueRequestAndWait(QueuedRequest request) throws InterruptedExcept
// queue so that a subsequent batch will wait for its completion.
synchronized (shutdownLock) {
if (isShutdown()) {
handleShutdown(batch);
throw shutdownException(batch);
}

verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send().");
Expand All @@ -423,7 +420,7 @@ void trySendBatch(QueuedBatch batch) {
sendBatch(batch);
synchronized (shutdownLock) {
if (isShutdown()) {
handleShutdown(batch);
throw shutdownException(batch);
}

verify(
Expand All @@ -434,7 +431,6 @@ void trySendBatch(QueuedBatch batch) {
// of the next batch (if one exists).
batch.notifySent();
} catch (Exception e) {
LOG.error("Error occurred sending batch.", e);
// Free waiters if the send() failed.
batch.notifyFailed();
// Propagate the exception to the calling thread.
Expand All @@ -447,16 +443,14 @@ private void sendBatch(QueuedBatch batch) {
return;
}

StreamingGetDataRequest batchedRequest = batch.asGetDataRequest();
synchronized (shutdownLock) {
// Synchronization of pending inserts is necessary with send to ensure duplicates are not
// sent on stream reconnect.
synchronized (this) {
// shutdown() clears pending, once the stream is shutdown, prevent values from being added
// to it.
if (isShutdown()) {
throw new WindmillStreamShutdownException(
"Stream was closed when attempting to send " + batch.requestsCount() + " requests.");
throw shutdownException(batch);
}

for (QueuedRequest request : batch.requestsReadOnly()) {
Expand All @@ -470,7 +464,7 @@ private void sendBatch(QueuedBatch batch) {
}

try {
send(batchedRequest);
send(batch.asGetDataRequest());
} catch (IllegalStateException e) {
// The stream broke before this call went through; onNewStream will retry the fetch.
LOG.warn("GetData stream broke before call started.", e);
Expand Down

0 comments on commit 93c5b31

Please sign in to comment.