Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix] [broker] Fix thousands orphan PersistentTopic caused OOM (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored and Technoboy- committed Nov 11, 2023
1 parent d386d14 commit 467e9c0
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1747,16 +1747,14 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
// Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
persistentTopic.getTransactionBuffer()
.closeAsync()
.exceptionally(t -> {
log.error("[{}] Close transactionBuffer failed", topic, t);
return null;
});
persistentTopic.stopReplProducers()
.whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
}, executor());
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
}
});
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
Expand All @@ -1765,16 +1763,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
.exceptionally((ex) -> {
log.warn("Replication or dedup check failed."
+ " Removing topic from topics list {}, {}", topic, ex);
persistentTopic.getTransactionBuffer()
.closeAsync()
.exceptionally(t -> {
log.error("[{}] Close transactionBuffer failed", topic, t);
return null;
});
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
}, executor());
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, closeEx) -> {
if (closeEx != null) {
log.warn("[{}] Get an error when closing topic.",
topic, closeEx);
}
topicFuture.completeExceptionally(ex);
});
});
return null;
});
} catch (PulsarServerException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class OrphanPersistentTopicTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
// Make the topic loading timeout faster.
int topicLoadTimeoutSeconds = 2;
long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds();
pulsar.getConfig().setTopicLoadTimeoutSeconds(2);

String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName).getPersistenceNamingEncoding();

// Make topic load timeout 5 times.
AtomicInteger timeoutCounter = new AtomicInteger();
for (int i = 0; i < 5; i++) {
mockZooKeeper.delay(topicLoadTimeoutSeconds * 2 * 1000, (op, path) -> {
if (mlPath.equals(path)) {
log.info("Topic load timeout: " + timeoutCounter.incrementAndGet());
return true;
}
return false;
});
}

// Load topic.
CompletableFuture<Consumer<byte[]>> consumer = pulsarClient.newConsumer()
.topic(tpName)
.subscriptionName("my-sub")
.subscribeAsync();

// After create timeout 5 times, the topic will be created successful.
Awaitility.await().ignoreExceptions().atMost(40, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopic(tpName, false);
assertTrue(future.isDone());
Optional<Topic> optional = future.get();
assertTrue(optional.isPresent());
});

// Assert only one PersistentTopic was not closed.
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners =
WhiteboxImpl.getInternalState(topicPoliciesService, "listeners");
assertEquals(listeners.get(TopicName.get(tpName)).size(), 1);

// cleanup.
consumer.join().close();
admin.topics().delete(tpName, false);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}

@Test
public void testNoOrphanTopicIfInitFailed() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);

// Load topic.
Consumer consumer = pulsarClient.newConsumer()
.topic(tpName)
.subscriptionName("my-sub")
.subscribe();

// Make the method `PersitentTopic.initialize` fail.
Field fieldCompactionServiceFactory = PulsarService.class.getDeclaredField("compactionServiceFactory");
fieldCompactionServiceFactory.setAccessible(true);
CompactionServiceFactory compactionServiceFactory =
(CompactionServiceFactory) fieldCompactionServiceFactory.get(pulsar);
fieldCompactionServiceFactory.set(pulsar, null);
admin.topics().unload(tpName);

// Wait for failed to create topic for several times.
Thread.sleep(5 * 1000);

// Remove the injected error, the topic will be created successful.
fieldCompactionServiceFactory.set(pulsar, compactionServiceFactory);
// We do not know the next time of consumer reconnection, so wait for 2 minutes to avoid flaky. It will be
// very fast in normal.
Awaitility.await().ignoreExceptions().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopic(tpName, false);
assertTrue(future.isDone());
Optional<Topic> optional = future.get();
assertTrue(optional.isPresent());
});

// Assert only one PersistentTopic was not closed.
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners =
WhiteboxImpl.getInternalState(topicPoliciesService, "listeners");
assertEquals(listeners.get(TopicName.get(tpName)).size(), 1);

// cleanup.
consumer.close();
admin.topics().delete(tpName, false);
}
}
14 changes: 13 additions & 1 deletion testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ Optional<KeeperException.Code> programmedFailure(Op op, String path) {
Optional<Failure> failure = failures.stream().filter(f -> f.predicate.test(op, path)).findFirst();
if (failure.isPresent()) {
failures.remove(failure.get());
return Optional.of(failure.get().failReturnCode);
return Optional.ofNullable(failure.get().failReturnCode);
} else {
return Optional.empty();
}
Expand All @@ -1131,6 +1131,18 @@ public void failConditional(KeeperException.Code rc, BiPredicate<Op, String> pre
failures.add(new Failure(rc, predicate));
}

public void delay(long millis, BiPredicate<Op, String> predicate) {
failures.add(new Failure(null, (op, s) -> {
if (predicate.test(op, s)) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {}
return true;
}
return false;
}));
}

public void setAlwaysFail(KeeperException.Code rc) {
this.alwaysFail.set(rc);
}
Expand Down

0 comments on commit 467e9c0

Please sign in to comment.