Skip to content

Commit

Permalink
[improve] [broker] Avoid subscription fenced error with consumer.seek…
Browse files Browse the repository at this point in the history
… whenever possible (apache#23163)

(cherry picked from commit d5ce1ce)
(cherry picked from commit bbe67c8)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Aug 16, 2024
1 parent ec1012d commit f49b5c9
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
Expand Down Expand Up @@ -220,6 +221,16 @@ public boolean setReplicated(boolean replicated) {

@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
CompletableFuture<Void> inProgressResetCursorFuture = this.inProgressResetCursorFuture;
if (inProgressResetCursorFuture != null) {
return inProgressResetCursorFuture.handle((ignore, ignoreEx) -> null)
.thenCompose(ignore -> addConsumerInternal(consumer));
} else {
return addConsumerInternal(consumer);
}
}

private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {
synchronized (PersistentSubscription.this) {
cursor.updateLastActive();
Expand Down Expand Up @@ -747,7 +758,8 @@ public void findEntryComplete(Position position, Object ctx) {
} else {
finalPosition = position.getNext();
}
resetCursor(finalPosition, future);
CompletableFuture<Void> resetCursorFuture = resetCursor(finalPosition);
FutureUtil.completeAfter(future, resetCursorFuture);
}

@Override
Expand All @@ -766,18 +778,13 @@ public void findEntryFailed(ManagedLedgerException exception,
}

@Override
public CompletableFuture<Void> resetCursor(Position position) {
CompletableFuture<Void> future = new CompletableFuture<>();
resetCursor(position, future);
return future;
}

private void resetCursor(Position finalPosition, CompletableFuture<Void> future) {
public CompletableFuture<Void> resetCursor(Position finalPosition) {
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription"));
return;
return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription"));
}

final CompletableFuture<Void> future = new CompletableFuture<>();
inProgressResetCursorFuture = future;
final CompletableFuture<Void> disconnectFuture;

// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
Expand All @@ -797,6 +804,7 @@ private void resetCursor(Position finalPosition, CompletableFuture<Void> future)
if (throwable != null) {
log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.completeExceptionally(
new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
Expand Down Expand Up @@ -835,6 +843,7 @@ public void resetComplete(Object ctx) {
dispatcher.cursorIsReset();
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.complete(null);
}

Expand All @@ -843,6 +852,7 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName,
finalPosition, exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
Expand All @@ -857,10 +867,12 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) {
}).exceptionally((e) -> {
log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.completeExceptionally(new BrokerServiceException(e));
return null;
});
});
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
Expand All @@ -50,8 +52,12 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -781,6 +787,64 @@ public void testSeekByFunctionAndMultiTopic() throws Exception {
assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2);
}

@Test
public void testSeekWillNotEncounteredFencedError() throws Exception {
String topicName = "persistent://prop/ns-abc/my-topic2";
admin.topics().createNonPartitionedTopic(topicName);
admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, 0));
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(conf, eventLoopGroup) {
protected void handleError(CommandError error) {
if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) {
receivedFencedErrorCounter.incrementAndGet();
}
super.handleError(error);
}
});

// publish some messages.
org.apache.pulsar.client.api.Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("s1")
.subscribe();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName).create();
MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
for (int i = 1; i < 11; i++) {
admin.topics().unload(topicName);
producer.send(i + "");
}

// Inject a delay for reset-cursor.
mockZooKeeper.delay(3000, (op, path) -> {
if (path.equals("/managed-ledgers/prop/ns-abc/persistent/my-topic2/s1")) {
return op.toString().equalsIgnoreCase("SET");
}
return false;
});

// Verify: consumer will not receive "subscription fenced" error after a seek.
for (int i = 1; i < 11; i++) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
}
consumer.seek(msgId1);
Awaitility.await().untilAsserted(() -> {
assertTrue(consumer.isConnected());
});
assertEquals(receivedFencedErrorCounter.get(), 0);

// cleanup.
producer.close();
consumer.close();
client.close();
admin.topics().delete(topicName);
}

@Test
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
Expand Down

0 comments on commit f49b5c9

Please sign in to comment.