Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhil-ctds committed Dec 24, 2024
1 parent 2b13845 commit 30d3b59
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3314,15 +3314,15 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,

@FieldContext(
category = CATEGORY_SERVER,
doc = "Opt-out of topic-existence check when setting permissions"
doc = "The class name of the factory that implements the topic compaction service."
)
private boolean allowAclChangesOnNonExistentTopics = false;
private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory";

@FieldContext(
category = CATEGORY_SERVER,
doc = "The class name of the factory that implements the topic compaction service."
doc = "Opt-out of topic-existence check when setting permissions"
)
private String compactionServiceFactoryClassName = "org.apache.pulsar.compaction.PulsarCompactionServiceFactory";
private boolean allowAclChangesOnNonExistentTopics = false;

/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
Expand Down Expand Up @@ -225,43 +224,14 @@ protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissions
CompletableFuture<Void> validateAccessForTenantCf =
validateAdminAccessForTenantAsync(namespaceName.getTenant());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
validateAccessForTenantCf = validateAccessForTenantCf
.thenCompose(__ -> internalCheckTopicExists(topicName));
}
// var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
// if (checkIfTopicExists) {
// validateAccessForTenantCf = validateAccessForTenantCf
// .thenCompose(__ -> internalCheckTopicExists(topicName));
// }

return validateAccessForTenantCf
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}

Map<String, Set<AuthAction>> permissions = new HashMap<>();
String topicUri = topicName.toString();
AuthPolicies auth = policies.get().auth_policies;
// First add namespace level permissions
auth.getNamespaceAuthentication().forEach(permissions::put);

// Then add topic level permissions
if (auth.getTopicAuthentication().containsKey(topicUri)) {
for (Map.Entry<String, Set<AuthAction>> entry :
auth.getTopicAuthentication().get(topicUri).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();

if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
}));
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName));
}

protected void validateCreateTopic(TopicName topicName) {
Expand Down Expand Up @@ -314,11 +284,11 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
CompletableFuture<Void> validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
validateAccessForTenantCf = validateAccessForTenantCf
.thenCompose(__ -> internalCheckTopicExists(topicName));
}
// var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
// if (checkIfTopicExists) {
// validateAccessForTenantCf = validateAccessForTenantCf
// .thenCompose(__ -> internalCheckTopicExists(topicName));
// }

validateAccessForTenantCf
.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions))
Expand All @@ -337,11 +307,11 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
validateAccessForTenantCf = validateAccessForTenantCf
.thenCompose(__ -> internalCheckTopicExists(topicName));
}
// var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
// if (checkIfTopicExists) {
// validateAccessForTenantCf = validateAccessForTenantCf
// .thenCompose(__ -> internalCheckTopicExists(topicName));
// }

validateAccessForTenantCf
.thenCompose(unused1 -> getPartitionedTopicMetadataAsync(topicName, true, false)
Expand Down

0 comments on commit 30d3b59

Please sign in to comment.