Skip to content

Commit

Permalink
[fix] [broker] Fix configurationMetadataSyncEventTopic is marked supp…
Browse files Browse the repository at this point in the history
…orting dynamic setting, but not implemented (apache#22684)

(cherry picked from commit ff4853e)
(cherry picked from commit 03da743)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed May 13, 2024
1 parent 471ec48 commit 12c3055
Show file tree
Hide file tree
Showing 11 changed files with 633 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,12 @@ public CompletableFuture<Void> closeAsync() {
}
}

closeLocalMetadataStore();
asyncCloseFutures.add(closeLocalMetadataStore());
if (configMetadataSynchronizer != null) {
asyncCloseFutures.add(configMetadataSynchronizer.closeAsync());
}
if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) {
configurationMetadataStore.close();
if (configMetadataSynchronizer != null) {
configMetadataSynchronizer.close();
configMetadataSynchronizer = null;
}
}

if (transactionExecutorProvider != null) {
Expand Down Expand Up @@ -1125,14 +1124,16 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.build());
}

protected void closeLocalMetadataStore() throws Exception {
protected CompletableFuture<Void> closeLocalMetadataStore() throws Exception {
if (localMetadataStore != null) {
localMetadataStore.close();
}
if (localMetadataSynchronizer != null) {
localMetadataSynchronizer.close();
CompletableFuture<Void> closeSynchronizer = localMetadataSynchronizer.closeAsync();
localMetadataSynchronizer = null;
return closeSynchronizer;
}
return CompletableFuture.completedFuture(null);
}

protected void startLeaderElectionService() {
Expand Down Expand Up @@ -1936,6 +1937,71 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
return new BrokerService(pulsar, ioEventLoopGroup);
}

public void initConfigMetadataSynchronizerIfNeeded() {
mutex.lock();
try {
final String newTopic = config.getConfigurationMetadataSyncEventTopic();
final PulsarMetadataEventSynchronizer oldSynchronizer = configMetadataSynchronizer;
// Skip if not support.
if (!(configurationMetadataStore instanceof MetadataStoreExtended)) {
LOG.info(
"Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}]"
+ " does not support.", configurationMetadataStore.getClass().getName());
return;
}
// Skip if no changes.
// case-1: both null.
// case-2: both topics are the same.
if ((oldSynchronizer == null && StringUtils.isBlank(newTopic))) {
LOG.info("Skip to update Metadata Synchronizer because the topic[null] does not changed.");
}
if (StringUtils.isNotBlank(newTopic) && oldSynchronizer != null) {
TopicName newTopicName = TopicName.get(newTopic);
TopicName oldTopicName = TopicName.get(oldSynchronizer.getTopicName());
if (newTopicName.equals(oldTopicName)) {
LOG.info("Skip to update Metadata Synchronizer because the topic[{}] does not changed.",
oldTopicName);
}
}
// Update(null or not null).
// 1.set the new one.
// 2.close the old one.
// 3.async start the new one.
if (StringUtils.isBlank(newTopic)) {
configMetadataSynchronizer = null;
} else {
configMetadataSynchronizer = new PulsarMetadataEventSynchronizer(this, newTopic);
}
// close the old one and start the new one.
PulsarMetadataEventSynchronizer newSynchronizer = configMetadataSynchronizer;
MetadataStoreExtended metadataStoreExtended = (MetadataStoreExtended) configurationMetadataStore;
metadataStoreExtended.updateMetadataEventSynchronizer(newSynchronizer);
Runnable startNewSynchronizer = () -> {
if (newSynchronizer == null) {
return;
}
try {
newSynchronizer.start();
} catch (Exception e) {
// It only occurs when get internal client fails.
LOG.error("Start Metadata Synchronizer with topic {} failed.",
newTopic, e);
}
};
executor.submit(() -> {
if (oldSynchronizer != null) {
oldSynchronizer.closeAsync().whenComplete((ignore, ex) -> {
startNewSynchronizer.run();
});
} else {
startNewSynchronizer.run();
}
});
} finally {
mutex.unlock();
}
}

private CompactionServiceFactory loadCompactionServiceFactory() {
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
var compactionServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2812,6 +2812,11 @@ private void updateConfigurationAndRegisterListeners() {
pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean) enabled);
});

// add listener to notify web service httpRequestsFailOnUnknownPropertiesEnabled changed.
registerConfigurationListener("configurationMetadataSyncEventTopic", enabled -> {
pulsar.initConfigMetadataSynchronizerIfNeeded();
});

// add more listeners here

// (3) create dynamic-config if not exist.
Expand Down
Loading

0 comments on commit 12c3055

Please sign in to comment.