From 8da3bf8322c536c495541c80926cdf9389612515 Mon Sep 17 00:00:00 2001 From: Girish Sharma Date: Sun, 1 Sep 2024 09:49:45 +0530 Subject: [PATCH] [improve][admin] PIP-369 Introduce `unload` flag in `ns-isolation-policy set` call (#23120) Co-authored-by: Zixuan Liu --- .../broker/admin/impl/ClustersBase.java | 54 ++++- .../pulsar/broker/admin/AdminApi2Test.java | 208 ++++++++++++++++-- .../policies/data/NamespaceIsolationData.java | 4 + .../NamespaceIsolationPolicyUnloadScope.java | 37 ++++ .../cli/CmdNamespaceIsolationPolicy.java | 17 +- .../policies/NamespaceIsolationPolicy.java | 6 + .../data/NamespaceIsolationDataImpl.java | 17 +- .../impl/NamespaceIsolationPolicyImpl.java | 8 + 8 files changed, 324 insertions(+), 27 deletions(-) create mode 100644 pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 4fe8a01e679da..132c99ce16bec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -27,11 +27,13 @@ import io.swagger.annotations.ExampleProperty; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -65,6 +67,7 @@ import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; +import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -721,10 +724,13 @@ public void setNamespaceIsolationPolicy( .setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap()) .thenApply(__ -> new NamespaceIsolationPolicies())) ).thenCompose(nsIsolationPolicies -> { + NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies + .getPolicies().getOrDefault(policyName, null); nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() - .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData)) + .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()) + .thenApply(__ -> oldPolicy); + }).thenCompose(oldPolicy -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy)) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", clientAppId(), cluster, policyName); @@ -759,7 +765,13 @@ public void setNamespaceIsolationPolicy( * Get matched namespaces; call unload for each namespaces. */ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster, - NamespaceIsolationDataImpl policyData) { + NamespaceIsolationDataImpl policyData, + NamespaceIsolationDataImpl oldPolicy) { + // exit early if none of the namespaces need to be unloaded + if (NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) { + return CompletableFuture.completedFuture(null); + } + PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); @@ -768,6 +780,7 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus } // compile regex patterns once List namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList(); + // TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> { List>> filteredNamespacesForEachTenant = tenants.stream() .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { @@ -793,6 +806,41 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) { return CompletableFuture.completedFuture(null); } + // If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might + // actually have been changed. + + log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData); + if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) { + // We also compare that the previous primary broker list is same as current, in case all namespaces need + // to be placed again anyway. + if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) { + // list is same, so we continue finding the changed namespaces. + + // We create a union regex list contains old + new regexes + Set combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces()); + combinedNamespaces.addAll(policyData.getNamespaces()); + // We create a intersection of the old and new regexes. These won't need to be unloaded + Set commonNamespaces = new HashSet<>(oldPolicy.getNamespaces()); + commonNamespaces.retainAll(policyData.getNamespaces()); + + log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces); + + // Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old) + combinedNamespaces.removeAll(commonNamespaces); + + log.debug("changed regexes: {}", commonNamespaces); + + // Now we further filter the filtered namespaces based on this combinedNamespaces set + shouldUnloadNamespaces = shouldUnloadNamespaces.stream() + .filter(name -> combinedNamespaces.stream() + .map(Pattern::compile) + .anyMatch(pattern -> pattern.matcher(name).matches()) + ).toList(); + + } + } + // unload type is either null or not in (changed, none), so we proceed to unload all namespaces + // TODO - default in 4.x should become `changed` List> futures = shouldUnloadNamespaces.stream() .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) .collect(Collectors.toList()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 40e2ca8cce905..155994c814c11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH; +import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -53,6 +54,7 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.NotAcceptableException; @@ -109,27 +111,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; -import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; -import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; -import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; -import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; -import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ConsumerStats; -import org.apache.pulsar.common.policies.data.EntryFilters; -import org.apache.pulsar.common.policies.data.FailureDomain; -import org.apache.pulsar.common.policies.data.NamespaceIsolationData; -import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; -import org.apache.pulsar.common.policies.data.PartitionedTopicStats; -import org.apache.pulsar.common.policies.data.PersistencePolicies; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.SubscriptionStats; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.policies.data.*; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -3496,4 +3478,188 @@ public void testGetStatsIfPartitionNotExists() throws Exception { // cleanup. admin.topics().deletePartitionedTopic(partitionedTp); } + + private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List namespaces, + List primaryBrokers + ) { + // setup ns-isolation-policy in both the clusters. + Map parameters1 = new HashMap<>(); + parameters1.put("min_limit", "1"); + parameters1.put("usage_threshold", "100"); + List nsRegexList = new ArrayList<>(namespaces); + + return NamespaceIsolationData.builder() + // "prop-ig/ns1" is present in test cluster, policy set on test2 should work + .namespaces(nsRegexList) + .primary(primaryBrokers) + .secondary(Collections.singletonList("")) + .autoFailoverPolicy(AutoFailoverPolicyData.builder() + .policyType(AutoFailoverPolicyType.min_available) + .parameters(parameters1) + .build()) + .unloadScope(scope) + .build(); + } + + private boolean allTopicsUnloaded(List topics) { + for (String topic : topics) { + if (pulsar.getBrokerService().getTopicReference(topic).isPresent()) { + return false; + } + } + return true; + } + + private void loadTopics(List topics) throws PulsarClientException, ExecutionException, InterruptedException { + // create a topic by creating a producer so that the topic is present on the broker + for (String topic : topics) { + Producer producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + pulsar.getBrokerService().getTopicIfExists(topic).get(); + } + + // All namespaces are loaded onto broker. Assert that + for (String topic : topics) { + assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent()); + } + } + + /** + * Validates that the namespace isolation policy set and update is unloading only the relevant namespaces based on + * the unload scope provided. + * + * @param topicType persistent or non persistent. + * @param policyName policy name. + * @param nsPrefix unique namespace prefix. + * @param totalNamespaces total namespaces to create. Only the end part. Each namespace also gets a topic t1. + * @param initialScope unload scope while creating the policy. + * @param initialNamespaceRegex namespace regex while creating the policy. + * @param initialLoadedNS expected namespaces to be still loaded after the policy create call. Remaining namespaces + * will be asserted to be unloaded within 20 seconds. + * @param updatedScope unload scope while updating the policy. + * @param updatedNamespaceRegex namespace regex while updating the policy. + * @param updatedLoadedNS expected namespaces to be loaded after policy update call. Remaining namespaces will be + * asserted to be unloaded within 20 seconds. + * @throws PulsarAdminException + * @throws PulsarClientException + * @throws ExecutionException + * @throws InterruptedException + */ + private void testIsolationPolicyUnloadsNSWithScope(String topicType, String policyName, String nsPrefix, + List totalNamespaces, + NamespaceIsolationPolicyUnloadScope initialScope, + List initialNamespaceRegex, List initialLoadedNS, + NamespaceIsolationPolicyUnloadScope updatedScope, + List updatedNamespaceRegex, List updatedLoadedNS, + List updatedBrokerRegex) + throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException { + + // Create all namespaces + List allTopics = new ArrayList<>(); + for (String namespacePart: totalNamespaces) { + admin.namespaces().createNamespace(nsPrefix + namespacePart, Set.of("test")); + allTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1"); + } + // Load all topics so that they are present. Assume topic t1 under each namespace + loadTopics(allTopics); + + // Create the policy + NamespaceIsolationData nsPolicyData1 = createPolicyData( + initialScope, initialNamespaceRegex, Collections.singletonList(".*") + ); + admin.clusters().createNamespaceIsolationPolicy("test", policyName, nsPolicyData1); + + List initialLoadedTopics = new ArrayList<>(); + for (String namespacePart: initialLoadedNS) { + initialLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1"); + } + + List initialUnloadedTopics = new ArrayList<>(allTopics); + initialUnloadedTopics.removeAll(initialLoadedTopics); + + // Assert that all topics (and thus ns) not under initialLoadedNS namespaces are unloaded + if (initialUnloadedTopics.isEmpty()) { + // Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload + TimeUnit.SECONDS.sleep(5); + } else { + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> allTopicsUnloaded(initialUnloadedTopics)); + } + // Assert that all topics under initialLoadedNS are still present + initialLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent())); + + // Load the topics again + loadTopics(allTopics); + + // Update policy using updatedScope with updated namespace regex + nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex, updatedBrokerRegex); + admin.clusters().updateNamespaceIsolationPolicy("test", policyName, nsPolicyData1); + + List updatedLoadedTopics = new ArrayList<>(); + for (String namespacePart : updatedLoadedNS) { + updatedLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1"); + } + + List updatedUnloadedTopics = new ArrayList<>(allTopics); + updatedUnloadedTopics.removeAll(updatedLoadedTopics); + + // Assert that all topics (and thus ns) not under updatedLoadedNS namespaces are unloaded + if (updatedUnloadedTopics.isEmpty()) { + // Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload + TimeUnit.SECONDS.sleep(5); + } else { + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> allTopicsUnloaded(updatedUnloadedTopics)); + } + // Assert that all topics under updatedLoadedNS are still present + updatedLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent())); + + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) throws Exception { + String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-"; + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception { + String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-"; + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithNoneScope(final String topicType) throws Exception { + String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-"; + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2", "c1"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String topicType) throws Exception { + String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-"; + // As per changed flag, only c1 should unload, but due to primary change, both a* and c* will. + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-primary-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"), + List.of(".*", "broker.*") + ); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java index aa48e69c14571..4f367f72fda33 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java @@ -31,6 +31,8 @@ public interface NamespaceIsolationData { AutoFailoverPolicyData getAutoFailoverPolicy(); + NamespaceIsolationPolicyUnloadScope getUnloadScope(); + void validate(); interface Builder { @@ -42,6 +44,8 @@ interface Builder { Builder autoFailoverPolicy(AutoFailoverPolicyData autoFailoverPolicyData); + Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope); + NamespaceIsolationData build(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java new file mode 100644 index 0000000000000..2edeac45630f5 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +/** + * The type of unload to perform while setting the isolation policy. + */ +public enum NamespaceIsolationPolicyUnloadScope { + all_matching, // unloads all matching namespaces as per new regex + none, // unloads no namespaces + changed; // unloads only the namespaces which are newly added or removed from the regex list + + public static NamespaceIsolationPolicyUnloadScope fromString(String unloadScopeString) { + for (NamespaceIsolationPolicyUnloadScope unloadScope : NamespaceIsolationPolicyUnloadScope.values()) { + if (unloadScope.toString().equalsIgnoreCase(unloadScopeString)) { + return unloadScope; + } + } + return null; + } +} diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index e9896decd8c96..0f5f6b211a544 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; +import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; import picocli.CommandLine.Command; import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; @@ -73,10 +74,19 @@ private class SetPolicy extends CliCommand { required = true, split = ",") private Map autoFailoverPolicyParams; + @Option(names = "--unload-scope", description = "configure the type of unload to do -" + + " ['all_matching', 'none', 'changed'] namespaces. By default, all namespaces matching the namespaces" + + " regex will be unloaded and placed again. You can choose to not unload any namespace while setting" + + " this new policy by choosing `none` or choose to unload only the namespaces whose placement will" + + " actually change. If you chose 'none', you will need to manually unload the namespaces for them to" + + " be placed correctly, or wait till some namespaces get load balanced automatically based on load" + + " shedding configurations.") + private NamespaceIsolationPolicyUnloadScope unloadScope; + void run() throws PulsarAdminException { // validate and create the POJO NamespaceIsolationData namespaceIsolationData = createNamespaceIsolationData(namespaces, primary, secondary, - autoFailoverPolicyTypeName, autoFailoverPolicyParams); + autoFailoverPolicyTypeName, autoFailoverPolicyParams, unloadScope); getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData); } @@ -167,7 +177,8 @@ private NamespaceIsolationData createNamespaceIsolationData(List namespa List primary, List secondary, String autoFailoverPolicyTypeName, - Map autoFailoverPolicyParams) { + Map autoFailoverPolicyParams, + NamespaceIsolationPolicyUnloadScope unload) { // validate namespaces = validateList(namespaces); @@ -234,6 +245,8 @@ private NamespaceIsolationData createNamespaceIsolationData(List namespa throw new ParameterException("Unknown auto failover policy type specified : " + autoFailoverPolicyTypeName); } + nsIsolationDataBuilder.unloadScope(unload); + return nsIsolationDataBuilder.build(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java index bd28d30d4cee9..52480d91eefa4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java @@ -23,6 +23,7 @@ import java.util.SortedSet; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.BrokerStatus; +import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; /** * Namespace isolation policy. @@ -43,6 +44,11 @@ public interface NamespaceIsolationPolicy { */ List getSecondaryBrokers(); + /** + * Get the unload scope for the policy set call. + */ + NamespaceIsolationPolicyUnloadScope getUnloadScope(); + /** * Get the list of primary brokers for the namespace according to the policy. * diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java index bdb51f63f89ed..1e72f0e50ee05 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java @@ -75,6 +75,15 @@ public class NamespaceIsolationDataImpl implements NamespaceIsolationData { @JsonProperty("auto_failover_policy") private AutoFailoverPolicyData autoFailoverPolicy; + @ApiModelProperty( + name = "unload_scope", + value = "The type of unload to perform while applying the new isolation policy.", + example = "'all_matching' (default) for unloading all matching namespaces. 'none' for not unloading " + + "any namespace. 'changed' for unloading only the namespaces whose placement is actually changing" + ) + @JsonProperty("unload_scope") + private NamespaceIsolationPolicyUnloadScope unloadScope; + public static NamespaceIsolationDataImplBuilder builder() { return new NamespaceIsolationDataImplBuilder(); } @@ -106,6 +115,7 @@ public static class NamespaceIsolationDataImplBuilder implements NamespaceIsolat private List primary = new ArrayList<>(); private List secondary = new ArrayList<>(); private AutoFailoverPolicyData autoFailoverPolicy; + private NamespaceIsolationPolicyUnloadScope unloadScope; public NamespaceIsolationDataImplBuilder namespaces(List namespaces) { this.namespaces = namespaces; @@ -127,8 +137,13 @@ public NamespaceIsolationDataImplBuilder autoFailoverPolicy(AutoFailoverPolicyDa return this; } + public NamespaceIsolationDataImplBuilder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope) { + this.unloadScope = unloadScope; + return this; + } + public NamespaceIsolationDataImpl build() { - return new NamespaceIsolationDataImpl(namespaces, primary, secondary, autoFailoverPolicy); + return new NamespaceIsolationDataImpl(namespaces, primary, secondary, autoFailoverPolicy, unloadScope); } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java index af3663869fa02..440282f29cb36 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; import org.apache.pulsar.common.policies.data.BrokerStatus; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; +import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; /** * Implementation of the namespace isolation policy. @@ -39,6 +40,7 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy { private List primary; private List secondary; private AutoFailoverPolicy autoFailoverPolicy; + private NamespaceIsolationPolicyUnloadScope unloadScope; private boolean matchNamespaces(String fqnn) { for (String nsRegex : namespaces) { @@ -64,6 +66,7 @@ public NamespaceIsolationPolicyImpl(NamespaceIsolationData policyData) { this.primary = policyData.getPrimary(); this.secondary = policyData.getSecondary(); this.autoFailoverPolicy = AutoFailoverPolicyFactory.create(policyData.getAutoFailoverPolicy()); + this.unloadScope = policyData.getUnloadScope(); } @Override @@ -76,6 +79,11 @@ public List getSecondaryBrokers() { return this.secondary; } + @Override + public NamespaceIsolationPolicyUnloadScope getUnloadScope() { + return this.unloadScope; + } + @Override public List findPrimaryBrokers(List availableBrokers, NamespaceName namespace) { if (!this.matchNamespaces(namespace.toString())) {