Skip to content

Commit

Permalink
[fix] [broker] Make the new exclusive consumer instead the inactive o…
Browse files Browse the repository at this point in the history
…ne faster (apache#21183)

There is an issue similar to the apache#21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

- Check the connection of the old consumer is available when the new one tries to subscribe

(cherry picked from commit 29db8f8)
(cherry picked from commit b796f56)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 15, 2024
1 parent 981a82a commit 82f77bc
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,22 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
}

if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already connected"));
Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (actConsumer != null) {
return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> {
if (actConsumerStillAlive == null || actConsumerStillAlive) {
return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already"
+ " connected"));
} else {
return addConsumer(consumer);
}
});
} else {
// It should never happen.

return FutureUtil.failedFuture(new ConsumerBusyException("Active consumer is in a strange state."
+ " Active consumer is null, but there are " + consumers.size() + " registered."));
}
}

if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public void setup() throws Exception {
doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
doReturn(channel).when(ctx).channel();
doReturn(ctx).when(serverCnx).ctx();
doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness();

NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
Expand Down Expand Up @@ -685,7 +686,15 @@ public void testSubscribeUnsubscribe() throws Exception {
f1.get();

// 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
CommandSubscribe cmd2 = new CommandSubscribe()
.setConsumerId(2)
.setTopic(successTopicName)
.setSubscription(successSubName)
.setConsumerName("consumer-name")
.setReadCompacted(false)
.setRequestId(2)
.setSubType(SubType.Exclusive);
Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd2));
try {
f2.get();
fail("should fail with exception");
Expand Down Expand Up @@ -750,19 +759,11 @@ public void testAddRemoveConsumer() throws Exception {
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());

// 2. duplicate add consumer
try {
sub.addConsumer(consumer).get();
fail("Should fail with ConsumerBusyException");
} catch (Exception e) {
assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
}

// 3. simple remove consumer
// 2. simple remove consumer
sub.removeConsumer(consumer);
assertFalse(sub.getDispatcher().isConsumerConnected());

// 4. duplicate remove consumer
// 3. duplicate remove consumer
try {
sub.removeConsumer(consumer);
fail("Should fail with ServerMetadataException");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -104,6 +105,7 @@
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
Expand Down Expand Up @@ -1070,6 +1072,98 @@ public void testProducerChangeSocket() throws Exception {
channel2.close();
}

@Test
public void testHandleConsumerAfterClientChannelInactive() throws Exception {
final String tName = successTopicName;
final long consumerId = 1;
final MutableInt requestId = new MutableInt(1);
final String sName = successSubName;
final String cName1 = ConsumerName.generateRandomName();
final String cName2 = ConsumerName.generateRandomName();
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
SubType.Exclusive, 0, cName1, 0);
channel.writeInbound(cmdSubscribe1);
assertTrue(getResponse() instanceof CommandSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertNotNull(topicRef.getSubscription(sName).getConsumers());
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName1);

// Verify the second producer using a new connection will override the consumer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
setChannelConnected(channel2.serverCnx);
ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
channel2.channel.writeInbound(cmdSubscribe2);
BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel);

assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandSuccess);
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName2);
backGroundExecutor.close();

// cleanup.
channel.finish();
channel2.close();
}

@Test
public void test2ndSubFailedIfDisabledConCheck()
throws Exception {
final String tName = successTopicName;
final long consumerId = 1;
final MutableInt requestId = new MutableInt(1);
final String sName = successSubName;
final String cName1 = ConsumerName.generateRandomName();
final String cName2 = ConsumerName.generateRandomName();
// Disabled connection check.
pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1);
resetChannel();
setChannelConnected();

// The consumer register using the first connection.
ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
SubType.Exclusive, 0, cName1, 0);
channel.writeInbound(cmdSubscribe1);
assertTrue(getResponse() instanceof CommandSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).orElse(null);
assertNotNull(topicRef);
assertNotNull(topicRef.getSubscription(sName).getConsumers());
assertEquals(topicRef.getSubscription(sName).getConsumers().stream().map(Consumer::consumerName)
.collect(Collectors.toList()), Collections.singletonList(cName1));

// Verify the consumer using a new connection will override the consumer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
setChannelConnected(channel2.serverCnx);
ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
channel2.channel.writeInbound(cmdSubscribe2);
BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel);

// Since the feature "ConnectionLiveness" has been disabled, the fix
// by https://github.com/apache/pulsar/pull/21183 will not be affected, so the client will still get an error.
Object responseOfConnection2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(responseOfConnection2 instanceof CommandError);
assertTrue(((CommandError) responseOfConnection2).getMessage()
.contains("Exclusive consumer is already connected"));
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName1);
backGroundExecutor.close();

// cleanup.
channel.finish();
channel2.close();
// Reset configuration.
pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(5000);
}

/**
* When a channel typed "EmbeddedChannel", once we call channel.execute(runnable), there is no background thread
* to run it.
Expand Down Expand Up @@ -1913,9 +2007,11 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);

BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel);

// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
successSubName, 2 /* consumer id */, 2 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);

Expand All @@ -1925,6 +2021,9 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ConsumerBusy);
});

// cleanup.
backGroundExecutor.close();
channel.finish();
}

Expand Down Expand Up @@ -2779,13 +2878,7 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since cannot send a pong message.",
channel, future.cause());
channel.close();
}
});
channel.writeInbound(Commands.newPong());
continue;
}
return cmd;
Expand Down

0 comments on commit 82f77bc

Please sign in to comment.