Skip to content

Commit

Permalink
[improve][admin] Opt-out of topic-existence check (apache#23709)
Browse files Browse the repository at this point in the history
Co-authored-by: Ómar Yasin <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit f571aa1)
(cherry picked from commit ef57c2a)
  • Loading branch information
omarkj authored and nikhil-ctds committed Dec 23, 2024
1 parent 540beac commit 6b1fb1f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3312,6 +3312,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Opt-out of topic-existence check when setting permissions"
)
private boolean allowAclChangesOnNonExistentTopics = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The class name of the factory that implements the topic compaction service."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,46 @@ protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync()

protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName));
CompletableFuture<Void> validateAccessForTenantCf =
validateAdminAccessForTenantAsync(namespaceName.getTenant());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
validateAccessForTenantCf = validateAccessForTenantCf
.thenCompose(__ -> internalCheckTopicExists(topicName));
}

return validateAccessForTenantCf
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}

Map<String, Set<AuthAction>> permissions = new HashMap<>();
String topicUri = topicName.toString();
AuthPolicies auth = policies.get().auth_policies;
// First add namespace level permissions
auth.getNamespaceAuthentication().forEach(permissions::put);

// Then add topic level permissions
if (auth.getTopicAuthentication().containsKey(topicUri)) {
for (Map.Entry<String, Set<AuthAction>> entry :
auth.getTopicAuthentication().get(topicUri).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();

if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
}));
}

protected void validateCreateTopic(TopicName topicName) {
Expand Down Expand Up @@ -272,10 +310,18 @@ private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, String
protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
grantPermissionsAsync(topicName, role, actions)
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))))
CompletableFuture<Void> validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
validateAccessForTenantCf = validateAccessForTenantCf
.thenCompose(__ -> internalCheckTopicExists(topicName));
}

validateAccessForTenantCf
.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
Expand All @@ -286,8 +332,18 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse

protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false)
CompletableFuture<Void> validateAccessForTenantCf =
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
validateAccessForTenantCf = validateAccessForTenantCf
.thenCompose(__ -> internalCheckTopicExists(topicName));
}

validateAccessForTenantCf
.thenCompose(unused1 -> getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -3612,4 +3613,23 @@ public void testRetentionAndBacklogQuotaCheck() throws PulsarAdminException {
});

}

@Test
@SneakyThrows
public void testPermissionsAllowAclChangesOnNonExistentTopics() {
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true);
try {
String namespace = "prop-xyz/ns1/";
final String random = UUID.randomUUID().toString();
final String topic = "persistent://" + namespace + random;
final String subject = UUID.randomUUID().toString();
admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce));
assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce);
admin.topics().revokePermissions(topic, subject);
assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty();
} finally {
// reset config
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false);
}
}
}

0 comments on commit 6b1fb1f

Please sign in to comment.