Skip to content

Commit

Permalink
[fix][broker] Skip topic.close during unloading if the topic future f…
Browse files Browse the repository at this point in the history
…ails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (apache#22379) (apache#22406)

(cherry picked from commit e664432)
  • Loading branch information
heesung-sn authored and nikhil-ctds committed Apr 3, 2024
1 parent 896f6c3 commit 9be9846
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
Expand Down Expand Up @@ -88,14 +91,27 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
return;
}

if (t != null) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
}
this.complete(serviceUnit, t);
return;
}
ServiceUnitState state = ServiceUnitStateData.state(data);

if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
}

switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
default -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,9 +2270,18 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
: CompletableFuture.completedFuture(null))
.exceptionally(e -> {
if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException
&& e.getMessage().contains("Please redo the lookup")) {
log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
return null;
}
throw FutureUtil.wrapToCompletionException(e);
}));
}
});

if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
TransactionMetadataStoreService metadataStoreService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,16 @@ protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();

if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return nsService.checkOwnershipPresentAsync(nsBundle);
}

LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();

return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -66,13 +65,10 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -237,6 +238,32 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
assertTrue(brokerLookupData.isPresent());
}

@Test(timeOut = 30 * 1000)
public void testUnloadUponTopicLookupFailure() throws Exception {
TopicName topicName =
TopicName.get("public/test/testUnloadUponTopicLookupFailure");
NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName);
primaryLoadManager.assign(Optional.empty(), bundle).get();

CompletableFuture future1 = new CompletableFuture();
CompletableFuture future2 = new CompletableFuture();
try {
pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1);
pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2);
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> {
future1.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
future2.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
});
admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
} finally {
pulsar1.getBrokerService().getTopics().remove(topicName.toString());
pulsar2.getBrokerService().getTopics().remove(topicName.toString());
}
}


@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-unload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,53 +94,59 @@ public void testTimeout() throws IllegalAccessException {
public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
String dstBroker = "broker-2";
String srcBroker = "broker-1";
String bundle = "bundle-1";
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
new UnloadDecision(new Unload(srcBroker, bundle), Success, Admin);
CompletableFuture<Void> future =
manager.waitAsync(CompletableFuture.completedFuture(null),
"bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
bundle, unloadDecision, 5, TimeUnit.SECONDS);
Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);

assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Assigning, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Splitting, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Releasing, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);

// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
"bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
}

Expand All @@ -158,7 +164,7 @@ public void testFailedStage() throws IllegalAccessException {
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT),
new ServiceUnitStateData(ServiceUnitState.Owned, null, "broker-1", VERSION_ID_INIT),
new IllegalStateException("Failed stage."));

try {
Expand Down

0 comments on commit 9be9846

Please sign in to comment.