Skip to content

Commit

Permalink
[fix][admin] Listen partitioned topic creation event (apache#23680)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 0a2ffe4)
(cherry picked from commit 7dcc255)
  • Loading branch information
nodece authored and nikhil-ctds committed Dec 12, 2024
1 parent 74fefda commit d8bc659
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -163,6 +165,10 @@ public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {

protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
for (int i = 0; i < numPartitions; i++) {
pulsar().getBrokerService().getTopicEventsDispatcher()
.notify(topicName.getPartition(i).toString(), TopicEvent.CREATE, EventStage.SUCCESS);
}
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
Expand Down Expand Up @@ -198,6 +204,8 @@ private CompletableFuture<Void> tryCreatePartitionAsync(final int partition) {
}
return null;
});
pulsar().getBrokerService().getTopicEventsDispatcher()
.notifyOnCompletion(result, topicName.getPartition(partition).toString(), TopicEvent.CREATE);
return result;
}

Expand Down Expand Up @@ -599,6 +607,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
throw new RestException(Status.CONFLICT, "This topic already exists");
}
})
.thenRun(() -> {
for (int i = 0; i < numPartitions; i++) {
pulsar().getBrokerService().getTopicEventsDispatcher()
.notify(topicName.getPartition(i).toString(), TopicEvent.CREATE,
EventStage.BEFORE);
}
})
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,23 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti
private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception {
final String[] expectedEvents;
if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
if (topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
} else {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
}
} else {
expectedEvents = new String[]{
// Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic
Expand Down

0 comments on commit d8bc659

Please sign in to comment.