diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 26ee45b7444f3..855e8ccfcaaf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -195,13 +195,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { /** * Get all the bundles that are owned by this broker. */ - public Set getOwnedServiceUnits() { + public CompletableFuture> getOwnedServiceUnitsAsync() { if (!started) { log.warn("Failed to get owned service units, load manager is not started."); - return Collections.emptySet(); + return CompletableFuture.completedFuture(Collections.emptySet()); } - Set> entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); + String brokerId = brokerRegistry.getBrokerId(); + Set> entrySet = serviceUnitStateChannel.getOwnershipEntrySet(); Set ownedServiceUnits = entrySet.stream() .filter(entry -> { var stateData = entry.getValue(); @@ -214,34 +215,26 @@ public Set getOwnedServiceUnits() { }).collect(Collectors.toSet()); // Add heartbeat and SLA monitor namespace bundle. NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration()); - try { - NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getFullBundle(heartbeatNamespace); - ownedServiceUnits.add(fullBundle); - } catch (Exception e) { - log.warn("Failed to get heartbeat namespace bundle.", e); - } NamespaceName heartbeatNamespaceV2 = NamespaceService .getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration()); - try { - NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getFullBundle(heartbeatNamespaceV2); - ownedServiceUnits.add(fullBundle); - } catch (Exception e) { - log.warn("Failed to get heartbeat namespace V2 bundle.", e); - } - NamespaceName slaMonitorNamespace = NamespaceService .getSLAMonitorNamespace(brokerId, pulsar.getConfiguration()); - try { - NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory() - .getFullBundle(slaMonitorNamespace); - ownedServiceUnits.add(fullBundle); - } catch (Exception e) { - log.warn("Failed to get SLA Monitor namespace bundle.", e); - } - - return ownedServiceUnits; + return pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundleAsync(heartbeatNamespace) + .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> { + log.warn("Failed to get heartbeat namespace bundle.", e); + return null; + }).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundleAsync(heartbeatNamespaceV2)) + .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> { + log.warn("Failed to get heartbeat namespace V2 bundle.", e); + return null; + }).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory() + .getFullBundleAsync(slaMonitorNamespace)) + .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> { + log.warn("Failed to get SLA Monitor namespace bundle.", e); + return null; + }).thenApply(__ -> ownedServiceUnits); } public enum Role { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a15dfce2dcf0a..873a4493c8bde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -801,12 +801,12 @@ public CompletableFuture> getOwnedNameSpac if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream() - .collect(Collectors.toMap(NamespaceBundle::toString, - bundle -> getNamespaceOwnershipStatus(true, - namespaceIsolationPolicies.getPolicyByNamespace( - bundle.getNamespaceObject())))); - return CompletableFuture.completedFuture(statusMap); + return extensibleLoadManager.getOwnedServiceUnitsAsync() + .thenApply(OwnedServiceUnits -> OwnedServiceUnits.stream() + .collect(Collectors.toMap(NamespaceBundle::toString, + bundle -> getNamespaceOwnershipStatus(true, + namespaceIsolationPolicies.getPolicyByNamespace( + bundle.getNamespaceObject()))))); } Collection> futures = ownershipCache.getOwnedBundlesAsync().values(); @@ -1122,7 +1122,12 @@ public OwnershipCache getOwnershipCache() { public Set getOwnedServiceUnits() { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - return extensibleLoadManager.getOwnedServiceUnits(); + try { + return extensibleLoadManager.getOwnedServiceUnitsAsync() + .get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } } return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle) .collect(Collectors.toSet()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 169ff89fe3c0d..d5aaed3436824 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1125,13 +1125,15 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio .getFullBundle(slaMonitorNamespacePulsar2); - Set ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits(); + Set ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); log.info("Owned service units: {}", ownedServiceUnitsByPulsar1); // heartbeat namespace bundle will own by pulsar1 assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1)); assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2)); assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1)); - Set ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits(); + Set ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); log.info("Owned service units: {}", ownedServiceUnitsByPulsar2); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3)); assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4)); @@ -1167,7 +1169,8 @@ private void assertOwnedServiceUnits( ExtensibleLoadManagerImpl extensibleLoadManager, NamespaceBundle bundle) throws PulsarAdminException { Awaitility.await().untilAsserted(() -> { - Set ownedBundles = extensibleLoadManager.getOwnedServiceUnits(); + Set ownedBundles = extensibleLoadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); assertTrue(ownedBundles.contains(bundle)); }); Map ownedNamespaces = @@ -1180,9 +1183,11 @@ private void assertOwnedServiceUnits( } @Test(timeOut = 30 * 1000) - public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() { + public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() + throws Exception { ExtensibleLoadManagerImpl loadManager = new ExtensibleLoadManagerImpl(); - Set ownedServiceUnits = loadManager.getOwnedServiceUnits(); + Set ownedServiceUnits = loadManager.getOwnedServiceUnitsAsync() + .get(5, TimeUnit.SECONDS); assertNotNull(ownedServiceUnits); assertTrue(ownedServiceUnits.isEmpty()); } @@ -1197,7 +1202,7 @@ public void testTryAcquiringOwnership() NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) .contains(namespaceEphemeralData.getNativeUrl())); - admin.namespaces().deleteNamespace(namespace, true); + admin.namespaces().deleteNamespace(namespace); } @Test(timeOut = 30 * 1000)