diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 9e92d3664fd11..b747482ac1aae 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3314,15 +3314,15 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, @FieldContext( category = CATEGORY_SERVER, - doc = "Opt-out of topic-existence check when setting permissions" + doc = "The class name of the factory that implements the topic compaction service." ) - private boolean allowAclChangesOnNonExistentTopics = false; + private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory"; @FieldContext( category = CATEGORY_SERVER, - doc = "The class name of the factory that implements the topic compaction service." + doc = "Opt-out of topic-existence check when setting permissions" ) - private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory"; + private boolean allowAclChangesOnNonExistentTopics = false; /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9edcda5359d71..2d8980227162c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -116,7 +116,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.AuthPolicies; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; @@ -225,43 +224,14 @@ protected CompletableFuture>> internalGetPermissions CompletableFuture validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant()); - var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); - if (checkIfTopicExists) { - validateAccessForTenantCf = validateAccessForTenantCf - .thenCompose(__ -> internalCheckTopicExists(topicName)); - } +// var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); +// if (checkIfTopicExists) { +// validateAccessForTenantCf = validateAccessForTenantCf +// .thenCompose(__ -> internalCheckTopicExists(topicName)); +// } return validateAccessForTenantCf - .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName) - .thenApply(policies -> { - if (!policies.isPresent()) { - throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); - } - - Map> permissions = new HashMap<>(); - String topicUri = topicName.toString(); - AuthPolicies auth = policies.get().auth_policies; - // First add namespace level permissions - auth.getNamespaceAuthentication().forEach(permissions::put); - - // Then add topic level permissions - if (auth.getTopicAuthentication().containsKey(topicUri)) { - for (Map.Entry> entry : - auth.getTopicAuthentication().get(topicUri).entrySet()) { - String role = entry.getKey(); - Set topicPermissions = entry.getValue(); - - if (!permissions.containsKey(role)) { - permissions.put(role, topicPermissions); - } else { - // Do the union between namespace and topic level - Set union = Sets.union(permissions.get(role), topicPermissions); - permissions.put(role, union); - } - } - } - return permissions; - })); + .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } protected void validateCreateTopic(TopicName topicName) { @@ -314,11 +284,11 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse CompletableFuture validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); - var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); - if (checkIfTopicExists) { - validateAccessForTenantCf = validateAccessForTenantCf - .thenCompose(__ -> internalCheckTopicExists(topicName)); - } +// var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); +// if (checkIfTopicExists) { +// validateAccessForTenantCf = validateAccessForTenantCf +// .thenCompose(__ -> internalCheckTopicExists(topicName)); +// } validateAccessForTenantCf .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) @@ -337,11 +307,11 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str validateAdminAccessForTenantAsync(namespaceName.getTenant()) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); - var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); - if (checkIfTopicExists) { - validateAccessForTenantCf = validateAccessForTenantCf - .thenCompose(__ -> internalCheckTopicExists(topicName)); - } +// var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); +// if (checkIfTopicExists) { +// validateAccessForTenantCf = validateAccessForTenantCf +// .thenCompose(__ -> internalCheckTopicExists(topicName)); +// } validateAccessForTenantCf .thenCompose(unused1 -> getPartitionedTopicMetadataAsync(topicName, true, false)