Skip to content

Commit

Permalink
Fixed asynchronous send backpressure capability
Browse files Browse the repository at this point in the history
  • Loading branch information
guyinyou committed Sep 4, 2023
1 parent f82718a commit 3d7ad2f
Showing 1 changed file with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,27 +547,62 @@ public void send(Message msg,
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);

final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
newCallBack.onException(e);
}
} else {
sendCallback.onException(
newCallBack.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
};
executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}

public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
class BackpressureSendCallBack implements SendCallback {
public boolean isSemaphoreAsyncSizeAquired = false;
public boolean isSemaphoreAsyncNumAquired = false;
public int msgLen;
private final SendCallback sendCallback;

public BackpressureSendCallBack(final SendCallback sendCallback) {
this.sendCallback = sendCallback;
}

@Override
public void onSuccess(SendResult sendResult) {
if (isSemaphoreAsyncSizeAquired) {
semaphoreAsyncSendSize.release(msgLen);
}
if (isSemaphoreAsyncNumAquired) {
semaphoreAsyncSendNum.release();
}
sendCallback.onSuccess(sendResult);
}

@Override
public void onException(Throwable e) {
if (isSemaphoreAsyncSizeAquired) {
semaphoreAsyncSendSize.release(msgLen);
}
if (isSemaphoreAsyncNumAquired) {
semaphoreAsyncSendNum.release();
}
sendCallback.onException(e);
}
}

public void executeAsyncMessageSend(Runnable runnable, final Message msg, final BackpressureSendCallBack sendCallback,
final long timeout, final long beginStartTime)
throws MQClientException, InterruptedException {
ExecutorService executor = this.getAsyncSenderExecutor();
Expand Down Expand Up @@ -595,23 +630,17 @@ public void executeAsyncMessageSend(Runnable runnable, final Message msg, final
return;
}
}

sendCallback.isSemaphoreAsyncSizeAquired = isSemaphoreAsyncSizeAquired;
sendCallback.isSemaphoreAsyncNumAquired = isSemaphoreAsyncNumAquired;
sendCallback.msgLen = msgLen;
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (isEnableBackpressureForAsyncMode) {
runnable.run();
} else {
throw new MQClientException("executor rejected ", e);
}
} finally {
if (isSemaphoreAsyncSizeAquired) {
semaphoreAsyncSendSize.release(msgLen);
}
if (isSemaphoreAsyncNumAquired) {
semaphoreAsyncSendNum.release();
}
}

}

public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,
Expand Down Expand Up @@ -1188,7 +1217,7 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
@Deprecated
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {

BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
Expand All @@ -1203,22 +1232,22 @@ public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
sendKernelImpl(msg, mq, CommunicationMode.ASYNC, newCallBack, null,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
}
} catch (Exception e) {
sendCallback.onException(e);
newCallBack.onException(e);
}
}

};

executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}

/**
Expand Down Expand Up @@ -1315,7 +1344,7 @@ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCal
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {

BackpressureSendCallBack newCallBack = new BackpressureSendCallBack(sendCallback);
final long beginStartTime = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
Expand All @@ -1324,21 +1353,21 @@ public void run() {
if (timeout > costTime) {
try {
try {
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, newCallBack,
timeout - costTime);
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
} catch (Exception e) {
sendCallback.onException(e);
newCallBack.onException(e);
}
} else {
sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
newCallBack.onException(new RemotingTooMuchRequestException("call timeout"));
}
}

};
executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
}

/**
Expand Down

0 comments on commit 3d7ad2f

Please sign in to comment.