From e5d355e19166508ef8f0db23f8112c84931607dc Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 19 Sep 2024 20:24:10 +0800 Subject: [PATCH] [improve][broker] Replace ConcurrentOpenHashMap with ConcurrentHashMap in Topic classes --- .../admin/impl/PersistentTopicsBase.java | 4 +- .../pulsar/broker/service/AbstractTopic.java | 3 +- .../pulsar/broker/service/BrokerService.java | 4 +- .../apache/pulsar/broker/service/Topic.java | 7 +- .../nonpersistent/NonPersistentTopic.java | 30 ++---- .../service/persistent/PersistentTopic.java | 92 ++++--------------- .../ReplicatedSubscriptionsController.java | 3 +- ...eplicatedSubscriptionsSnapshotBuilder.java | 5 +- .../broker/service/ClusterMigrationTest.java | 9 +- .../PersistentTopicConcurrentTest.java | 9 +- .../broker/service/PersistentTopicTest.java | 30 ++---- .../service/ReplicatedSubscriptionTest.java | 5 +- .../service/ReplicatorRateLimiterTest.java | 84 +++++++++-------- .../pulsar/broker/service/ReplicatorTest.java | 29 +++--- .../service/ReplicatorTopicPoliciesTest.java | 5 +- ...ransactionalReplicateSubscriptionTest.java | 3 +- .../nonpersistent/NonPersistentTopicTest.java | 3 +- ...catedSubscriptionsSnapshotBuilderTest.java | 15 +-- .../NamespaceStatsAggregatorTest.java | 10 +- 19 files changed, 125 insertions(+), 225 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4d04dfeda7a74..3e3e53012e3e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1268,7 +1268,7 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) { getTopicReferenceAsync(topicName) - .thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys()))) + .thenAccept(topic -> asyncResponse.resume(topic.getSubscriptions().keySet())) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (isNot307And404Exception(ex)) { @@ -2020,7 +2020,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy new ArrayList<>((int) topic.getReplicators().size()); List subNames = new ArrayList<>((int) topic.getSubscriptions().size()); - subNames.addAll(topic.getSubscriptions().keys().stream().filter( + subNames.addAll(topic.getSubscriptions().keySet().stream().filter( subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList()); for (int i = 0; i < subNames.size(); i++) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 9e5d6ef7191d1..48848586a890b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -26,6 +26,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -580,7 +581,7 @@ && getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddres public abstract int getNumberOfSameAddressConsumers(String clientAddress); protected int getNumberOfSameAddressConsumers(final String clientAddress, - final List subscriptions) { + final Collection subscriptions) { int count = 0; if (clientAddress != null) { for (Subscription subscription : subscriptions) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cb5e0853d53f3..3ace6a2d531a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1208,7 +1208,7 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD // v2 topics have a global name so check if the topic is replicated. if (t.isReplicated()) { // Delete is disallowed on global topic - final List clusters = t.getReplicators().keys(); + final var clusters = t.getReplicators().keySet(); log.error("Delete forbidden topic {} is replicated on clusters {}", topic, clusters); return FutureUtil.failedFuture( new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters)); @@ -1216,7 +1216,7 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD // shadow topic should be deleted first. if (t.isShadowReplicated()) { - final List shadowTopics = t.getShadowReplicators().keys(); + final var shadowTopics = t.getShadowReplicators().keySet(); log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics); return FutureUtil.failedFuture(new IllegalStateException( "Delete forbidden. Topic " + topic + " is replicated to shadow topics.")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 3ec09e9bfcd28..ec7889af6bbbe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -44,7 +44,6 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; @@ -183,7 +182,7 @@ CompletableFuture createSubscription(String subscriptionName, Init CompletableFuture unsubscribe(String subName); - ConcurrentOpenHashMap getSubscriptions(); + Map getSubscriptions(); CompletableFuture delete(); @@ -265,9 +264,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats Subscription getSubscription(String subscription); - ConcurrentOpenHashMap getReplicators(); + Map getReplicators(); - ConcurrentOpenHashMap getShadowReplicators(); + Map getShadowReplicators(); TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1b98ee2f8306d..65cf11f245641 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -96,7 +97,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; import org.slf4j.Logger; @@ -105,9 +105,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener { // Subscriptions to this topic - private final ConcurrentOpenHashMap subscriptions; + private final Map subscriptions = new ConcurrentHashMap<>(); - private final ConcurrentOpenHashMap replicators; + private final Map replicators = new ConcurrentHashMap<>(); // Ever increasing counter of entries added private static final AtomicLongFieldUpdater ENTRIES_ADDED_COUNTER_UPDATER = @@ -152,17 +152,6 @@ public void reset() { public NonPersistentTopic(String topic, BrokerService brokerService) { super(topic, brokerService); - - this.subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.replicators = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); this.isFenced = false; registerTopicPolicyListener(); } @@ -446,8 +435,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c if (failIfHasSubscriptions) { if (!subscriptions.isEmpty()) { isFenced = false; - deleteFuture.completeExceptionally( - new TopicBusyException("Topic has subscriptions:" + subscriptions.keys())); + deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions:" + + subscriptions.keySet().stream().toList())); return; } } else { @@ -714,18 +703,18 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) { } @Override - public ConcurrentOpenHashMap getSubscriptions() { + public Map getSubscriptions() { return subscriptions; } @Override - public ConcurrentOpenHashMap getReplicators() { + public Map getReplicators() { return replicators; } @Override - public ConcurrentOpenHashMap getShadowReplicators() { - return ConcurrentOpenHashMap.emptyMap(); + public Map getShadowReplicators() { + return Map.of(); } @Override @@ -1043,7 +1032,6 @@ private CompletableFuture checkAndUnsubscribeSubscriptions() { private CompletableFuture disconnectReplicators() { List> futures = new ArrayList<>(); - ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { futures.add(replicator.terminate()); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fc47889c60aac..383d89c02765e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -43,6 +43,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -186,7 +187,6 @@ import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.CompactedTopicImpl; import org.apache.pulsar.compaction.Compactor; @@ -206,10 +206,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal protected final ManagedLedger ledger; // Subscriptions to this topic - private final ConcurrentOpenHashMap subscriptions; + private final Map subscriptions = new ConcurrentHashMap<>(); - private final ConcurrentOpenHashMap replicators; - private final ConcurrentOpenHashMap shadowReplicators; + private final Map replicators = new ConcurrentHashMap<>(); + private final Map shadowReplicators = new ConcurrentHashMap<>(); @Getter private volatile List shadowTopics; private final TopicName shadowSourceTopic; @@ -391,18 +391,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS ? brokerService.getTopicOrderedExecutor().chooseThread(topic) : null; this.ledger = ledger; - this.subscriptions = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.replicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.shadowReplicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); @@ -475,41 +463,6 @@ public CompletableFuture initialize() { })); } - // for testing purposes - @VisibleForTesting - PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, - MessageDeduplication messageDeduplication) { - super(topic, brokerService); - // null check for backwards compatibility with tests which mock the broker service - this.orderedExecutor = brokerService.getTopicOrderedExecutor() != null - ? brokerService.getTopicOrderedExecutor().chooseThread(topic) - : null; - this.ledger = ledger; - this.messageDeduplication = messageDeduplication; - this.subscriptions = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.replicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.shadowReplicators = ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); - this.backloggedCursorThresholdEntries = - brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); - - if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { - this.transactionBuffer = brokerService.getPulsar() - .getTransactionBufferProvider().newTransactionBuffer(this); - } else { - this.transactionBuffer = new TransactionBufferDisable(this); - } - shadowSourceTopic = null; - } - private void initializeDispatchRateLimiterIfNeeded() { synchronized (dispatchRateLimiterLock) { // dispatch rate limiter for topic @@ -1454,8 +1407,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, // In this case, we shouldn't care if the usageCount is 0 or not, just proceed if (!closeIfClientsConnected) { if (failIfHasSubscriptions && !subscriptions.isEmpty()) { - return FutureUtil.failedFuture( - new TopicBusyException("Topic has subscriptions: " + subscriptions.keys())); + return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions: " + + subscriptions.keySet().stream().toList())); } else if (failIfHasBacklogs) { if (hasBacklogs(false)) { List backlogSubs = @@ -2135,10 +2088,6 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma } return null; }); - // clean up replicator if startup is failed - if (replicator == null) { - replicators.removeNullValue(remoteCluster); - } } finally { lock.readLock().unlock(); } @@ -2216,11 +2165,6 @@ protected CompletableFuture addShadowReplicationCluster(String shadowTopic } return null; }); - - // clean up replicator if startup is failed - if (replicator == null) { - shadowReplicators.removeNullValue(shadowTopic); - } }); } @@ -2280,7 +2224,7 @@ protected String getSchemaId() { } @Override - public ConcurrentOpenHashMap getSubscriptions() { + public Map getSubscriptions() { return subscriptions; } @@ -2290,12 +2234,12 @@ public PersistentSubscription getSubscription(String subscriptionName) { } @Override - public ConcurrentOpenHashMap getReplicators() { + public Map getReplicators() { return replicators; } @Override - public ConcurrentOpenHashMap getShadowReplicators() { + public Map getShadowReplicators() { return shadowReplicators; } @@ -3097,7 +3041,6 @@ private CompletableFuture checkAndDisconnectProducers() { private CompletableFuture checkAndDisconnectReplicators() { List> futures = new ArrayList<>(); - ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { if (replicator.getNumberOfEntriesInBacklog() <= 0) { futures.add(replicator.terminate()); @@ -3112,12 +3055,9 @@ public boolean shouldProducerMigrate() { @Override public boolean isReplicationBacklogExist() { - ConcurrentOpenHashMap replicators = getReplicators(); - if (replicators != null) { - for (Replicator replicator : replicators.values()) { - if (replicator.getNumberOfEntriesInBacklog() > 0) { - return true; - } + for (Replicator replicator : replicators.values()) { + if (replicator.getNumberOfEntriesInBacklog() > 0) { + return true; } } return false; @@ -3765,9 +3705,9 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco public CompletableFuture clearBacklog() { log.info("[{}] Clearing backlog on all cursors in the topic.", topic); List> futures = new ArrayList<>(); - List cursors = getSubscriptions().keys(); - cursors.addAll(getReplicators().keys()); - cursors.addAll(getShadowReplicators().keys()); + List cursors = new ArrayList<>(getSubscriptions().keySet()); + cursors.addAll(getReplicators().keySet()); + cursors.addAll(getShadowReplicators().keySet()); for (String cursor : cursors) { futures.add(clearBacklog(cursor)); } @@ -4167,7 +4107,7 @@ private void unfenceReplicatorsToResume() { checkShadowReplication(); } - private void removeTerminatedReplicators(ConcurrentOpenHashMap replicators) { + private void removeTerminatedReplicators(Map replicators) { Map terminatedReplicators = new HashMap<>(); replicators.forEach((cluster, replicator) -> { if (replicator.isTerminated()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index b873bc93cd1e4..f56cf9de66b75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -243,7 +243,8 @@ private void startNewSnapshot() { pendingSnapshotsMetric.inc(); stats.recordSnapshotStarted(); ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this, - topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC()); + topic.getReplicators().keySet(), topic.getBrokerService().pulsar().getConfiguration(), + Clock.systemUTC()); pendingSnapshots.put(builder.getSnapshotId(), builder); builder.start(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java index 0dacade3eed1c..e08b549f8aec9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java @@ -20,7 +20,6 @@ import io.prometheus.client.Summary; import java.time.Clock; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -43,7 +42,7 @@ public class ReplicatedSubscriptionsSnapshotBuilder { private final ReplicatedSubscriptionsController controller; private final Map responses = new TreeMap<>(); - private final List remoteClusters; + private final Set remoteClusters; private final Set missingClusters; private final boolean needTwoRounds; @@ -60,7 +59,7 @@ public class ReplicatedSubscriptionsSnapshotBuilder { "Time taken to create a consistent snapshot across clusters").register(); public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller, - List remoteClusters, ServiceConfiguration conf, Clock clock) { + Set remoteClusters, ServiceConfiguration conf, Clock clock) { this.snapshotId = UUID.randomUUID().toString(); this.controller = controller; this.remoteClusters = remoteClusters; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 8ec565f7d4566..e56a3495600f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -49,7 +49,6 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -344,7 +343,7 @@ public void testClusterMigration() throws Exception { assertFalse(topic2.getSubscriptions().isEmpty()); topic1.checkClusterMigration().get(); - ConcurrentOpenHashMap replicators = topic1.getReplicators(); + final var replicators = topic1.getReplicators(); replicators.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); @@ -798,20 +797,20 @@ public void testNamespaceMigration(SubscriptionType subType, boolean isClusterMi blueTopicNs2_1.checkClusterMigration().get(); } - ConcurrentOpenHashMap replicators = blueTopicNs1_1.getReplicators(); + final var replicators = blueTopicNs1_1.getReplicators(); replicators.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); assertTrue(blueTopicNs1_1.getSubscriptions().isEmpty()); if (isClusterMigrate) { - ConcurrentOpenHashMap replicatorsNm = blueTopicNs2_1.getReplicators(); + final var replicatorsNm = blueTopicNs2_1.getReplicators(); replicatorsNm.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); assertTrue(blueTopicNs2_1.getSubscriptions().isEmpty()); } else { - ConcurrentOpenHashMap replicatorsNm = blueTopicNs2_1.getReplicators(); + final var replicatorsNm = blueTopicNs2_1.getReplicators(); replicatorsNm.forEach((r, replicator) -> { assertTrue(replicator.isConnected()); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index cbbb8808f3d1a..85e0887465db2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -53,7 +53,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; @@ -154,7 +153,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { try { barrier.await(); // do subscription delete - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); // Thread.sleep(2,0); log.info("unsubscriber outcome is {}", ps.doUnsubscribe(ps.getConsumers().get(0)).get()); @@ -219,7 +218,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception { try { barrier.await(); // do subscription delete - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); // Thread.sleep(2,0); log.info("unsubscriber outcome is {}", ps.doUnsubscribe(ps.getConsumers().get(0)).get()); @@ -278,7 +277,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception { barrier.await(); // Thread.sleep(2,0); // assertTrue(topic.unsubscribe(successSubName).isDone()); - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); log.info("unsubscribe result : {}", topic.unsubscribe(successSubName).get()); log.info("closing consumer.."); @@ -339,7 +338,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception { log.info("&&&&&&&&& UNSUBSCRIBER TH"); // Thread.sleep(2,0); // assertTrue(topic.unsubscribe(successSubName).isDone()); - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); log.info("unsubscribe result : " + ps.doUnsubscribe(ps.getConsumers().get(0)).get()); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index f9171e883613b..2d2237a7ed876 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -64,6 +64,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; @@ -132,7 +133,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; @@ -777,11 +777,7 @@ private void testMaxConsumersShared() throws Exception { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("sub-1", sub); subscriptions.put("sub-2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -873,11 +869,7 @@ private void testMaxConsumersFailover() throws Exception { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("sub-1", sub); subscriptions.put("sub-2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -992,11 +984,7 @@ public void testMaxSameAddressConsumers() throws Exception { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); subscriptions.put("sub1", sub1); subscriptions.put("sub2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -1299,7 +1287,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { try { barrier.await(); // do subscription delete - ConcurrentOpenHashMap subscriptions = topic.getSubscriptions(); + final var subscriptions = topic.getSubscriptions(); PersistentSubscription ps = subscriptions.get(successSubName); // Thread.sleep(5,0); log.info("unsubscriber outcome is {}", ps.doUnsubscribe(ps.getConsumers().get(0)).get()); @@ -1681,7 +1669,7 @@ public void testAtomicReplicationRemoval() throws Exception { PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); topic.initialize().join(); String remoteReplicatorName = topic.getReplicatorPrefix() + "." + remoteCluster; - ConcurrentOpenHashMap replicatorMap = topic.getReplicators(); + final var replicatorMap = topic.getReplicators(); ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); @@ -2018,11 +2006,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { public void testCheckInactiveSubscriptions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - ConcurrentOpenHashMap subscriptions = - ConcurrentOpenHashMap.newBuilder() - .expectedItems(16) - .concurrencyLevel(1) - .build(); + final var subscriptions = new ConcurrentHashMap(); // This subscription is connected by consumer. PersistentSubscription nonDeletableSubscription1 = spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class, topic, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 4273e8bbaeb5b..5b896a22baa33 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -68,7 +68,6 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -292,7 +291,7 @@ public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception sentMessages.add(msg); } Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicators = topic1.getReplicators(); + final var replicators = topic1.getReplicators(); assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), @@ -1072,7 +1071,7 @@ private void testReplicatedSubscriptionWhenEnableReplication(Producer pr Awaitility.await().untilAsserted(() -> { List keys = pulsar1.getBrokerService() .getTopic(topic, false).get().get() - .getReplicators().keys(); + .getReplicators().keySet().stream().toList(); assertEquals(keys.size(), 1); assertTrue(pulsar1.getBrokerService() .getTopic(topic, false).get().get() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 90df16360614d..bec6b558ea401 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -25,9 +25,11 @@ import static org.testng.AssertJUnit.assertFalse; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -101,7 +103,7 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // rate limiter disable by default - assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(getRateLimiter(topic).isPresent()); //set topic-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -112,16 +114,16 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { admin1.topics().setReplicatorDispatchRate(topicName, topicRate); Awaitility.await().untilAsserted(() -> assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate)); - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //remove topic-level policy admin1.topics().removeReplicatorDispatchRate(topicName); Awaitility.await().untilAsserted(() -> assertNull(admin1.topics().getReplicatorDispatchRate(topicName))); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), -1); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), -1L); } @@ -145,7 +147,7 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // rate limiter disable by default - assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(getRateLimiter(topic).isPresent()); //set namespace-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -156,16 +158,16 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, topicRate); Awaitility.await().untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), topicRate)); - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //remove topic-level policy admin1.namespaces().removeReplicatorDispatchRate(namespace); Awaitility.await().untilAsserted(() -> assertNull(admin1.namespaces().getReplicatorDispatchRate(namespace))); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), -1); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), -1L); } @@ -189,7 +191,7 @@ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // rate limiter disable by default - assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(getRateLimiter(topic).isPresent()); //set broker-level policy, which should take effect admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg", "10"); @@ -203,9 +205,9 @@ public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception { .getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"), "20"); }); - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); } @Test @@ -228,9 +230,9 @@ public void testReplicatorRatePriority() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); //use broker-level by default - assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L); + assertTrue(getRateLimiter(topic).isPresent()); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 100); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 200L); //set namespace-level policy, which should take effect DispatchRate nsDispatchRate = DispatchRate.builder() @@ -241,8 +243,8 @@ public void testReplicatorRatePriority() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate); Awaitility.await() .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 50); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 60L); //set topic-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -253,8 +255,8 @@ public void testReplicatorRatePriority() throws Exception { admin1.topics().setReplicatorDispatchRate(topicName, topicRate); Awaitility.await().untilAsserted(() -> assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 10); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //Set the namespace-level policy, which should not take effect DispatchRate nsDispatchRate2 = DispatchRate.builder() @@ -265,21 +267,21 @@ public void testReplicatorRatePriority() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2); Awaitility.await() .untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 20L); //remove topic-level policy, namespace-level should take effect admin1.topics().removeReplicatorDispatchRate(topicName); Awaitility.await().untilAsserted(() -> assertNull(admin1.topics().getReplicatorDispatchRate(topicName))); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 500); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 600L); //remove namespace-level policy, broker-level should take effect admin1.namespaces().setReplicatorDispatchRate(namespace, null); Awaitility.await().untilAsserted(() -> - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100)); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), + assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), 100)); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), 200L); } @@ -315,7 +317,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); // 1. default replicator throttling not configured - Assert.assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + Assert.assertFalse(getRateLimiter(topic).isPresent()); // 2. change namespace setting of replicator dispatchRateMsg, verify topic changed. int messageRate = 100; @@ -329,7 +331,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { boolean replicatorUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + if (getRateLimiter(topic).isPresent()) { replicatorUpdated = true; break; } else { @@ -339,7 +341,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { } } Assert.assertTrue(replicatorUpdated); - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), messageRate); // 3. change namespace setting of replicator dispatchRateByte, verify topic changed. messageRate = 500; @@ -351,7 +353,7 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception { admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateByte); replicatorUpdated = false; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte() == messageRate) { + if (getRateLimiter(topic).get().getDispatchRateOnByte() == messageRate) { replicatorUpdated = true; break; } else { @@ -414,7 +416,7 @@ public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateT boolean replicatorUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + if (getRateLimiter(topic).isPresent()) { replicatorUpdated = true; break; } else { @@ -425,9 +427,9 @@ public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateT } Assert.assertTrue(replicatorUpdated); if (DispatchRateType.messageRate.equals(dispatchRateType)) { - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), messageRate); } else { - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), messageRate); } @Cleanup @@ -499,7 +501,7 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti boolean replicatorUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { - if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + if (getRateLimiter(topic).isPresent()) { replicatorUpdated = true; break; } else { @@ -509,7 +511,7 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti } } Assert.assertTrue(replicatorUpdated); - Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + Assert.assertEquals(getRateLimiter(topic).get().getDispatchRateOnMsg(), messageRate); @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) @@ -578,8 +580,8 @@ public void testReplicatorRateLimiterByBytes() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); Awaitility.await() - .untilAsserted(() -> assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent())); - assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), byteRate); + .untilAsserted(() -> assertTrue(getRateLimiter(topic).isPresent())); + assertEquals(getRateLimiter(topic).get().getDispatchRateOnByte(), byteRate); @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) @@ -608,5 +610,9 @@ public void testReplicatorRateLimiterByBytes() throws Exception { }); } + private static Optional getRateLimiter(PersistentTopic topic) { + return getRateLimiter(topic); + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 1c47abab775b3..aac7a85f477c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -109,7 +109,6 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; @@ -234,11 +233,7 @@ public void testConcurrentReplicator() throws Exception { final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class); startRepl.setAccessible(true); - Field replClientField = BrokerService.class.getDeclaredField("replicationClients"); - replClientField.setAccessible(true); - ConcurrentOpenHashMap replicationClients = - (ConcurrentOpenHashMap) replClientField - .get(pulsar1.getBrokerService()); + final var replicationClients = pulsar1.getBrokerService().getReplicationClients(); replicationClients.put("r3", pulsarClient); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); @@ -641,7 +636,7 @@ public void testReplicatePeekAndSkip() throws Exception { producer1.produce(2); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() - .get(topic.getReplicators().keys().get(0)); + .get(topic.getReplicators().keySet().stream().toList().get(0)); replicator.skipMessages(2); CompletableFuture result = replicator.peekNthMessage(1); Entry entry = result.get(50, TimeUnit.MILLISECONDS); @@ -668,7 +663,7 @@ public void testReplicatorClearBacklog() throws Exception { producer1.produce(2); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keys().get(0))); + topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); replicator.clearBacklog().get(); Thread.sleep(100); @@ -695,7 +690,7 @@ public void testResetReplicatorSubscriptionPosition() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keys().get(0))); + topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); MessageId id = topic.getLastMessageId().get(); admin1.topics().expireMessages(dest.getPartitionedTopicName(), @@ -799,7 +794,7 @@ public void testDeleteReplicatorFailure() throws Exception { @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); - final String replicatorClusterName = topic.getReplicators().keys().get(0); + final String replicatorClusterName = topic.getReplicators().keySet().stream().toList().get(0); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); CountDownLatch latch = new CountDownLatch(1); // delete cursor already : so next time if topic.removeReplicator will get exception but then it should @@ -840,7 +835,7 @@ public void testReplicatorProducerClosing() throws Exception { @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); - final String replicatorClusterName = topic.getReplicators().keys().get(0); + final String replicatorClusterName = topic.getReplicators().keySet().stream().toList().get(0); Replicator replicator = topic.getPersistentReplicator(replicatorClusterName); pulsar2.close(); pulsar2 = null; @@ -1679,7 +1674,7 @@ public void testReplicatorWithFailedAck() throws Exception { Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { - ConcurrentOpenHashMap replicators = topic.getReplicators(); + final var replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started, replicator.getState()); @@ -1932,9 +1927,9 @@ public void testEnableReplicationWithNamespaceAllowedClustersPolices() throws Ex // Verify the replication from cluster1 to cluster2 is ready, but the replication form the cluster2 to cluster1 // is not ready. Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicatorMap = persistentTopic1.getReplicators(); + final var replicatorMap = persistentTopic1.getReplicators(); assertEquals(replicatorMap.size(), 1); - Replicator replicator = replicatorMap.get(replicatorMap.keys().get(0)); + Replicator replicator = replicatorMap.get(replicatorMap.keySet().stream().toList().get(0)); assertTrue(replicator.isConnected()); }); @@ -1944,16 +1939,16 @@ public void testEnableReplicationWithNamespaceAllowedClustersPolices() throws Ex .get(); Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicatorMap = persistentTopic2.getReplicators(); + final var replicatorMap = persistentTopic2.getReplicators(); assertEquals(replicatorMap.size(), 0); }); // Enable replication at the topic level in the cluster2. admin2.topics().setReplicationClusters(topicName.toString(), List.of("r1", "r2")); // Verify the replication between cluster1 and cluster2 is ready. Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicatorMap = persistentTopic2.getReplicators(); + final var replicatorMap = persistentTopic2.getReplicators(); assertEquals(replicatorMap.size(), 1); - Replicator replicator = replicatorMap.get(replicatorMap.keys().get(0)); + Replicator replicator = replicatorMap.get(replicatorMap.keySet().stream().toList().get(0)); assertTrue(replicator.isConnected()); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index c0281f073cfd4..80b9182b2c32a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -750,7 +749,7 @@ public void testRemoveReplicationClusters() throws Exception { assertNotNull(topicRef); Awaitility.await().untilAsserted(() -> { - List replicaClusters = topicRef.getReplicators().keys().stream().sorted().collect(Collectors.toList()); + List replicaClusters = topicRef.getReplicators().keySet().stream().sorted().toList(); assertEquals(replicaClusters.size(), 1); assertEquals(replicaClusters.toString(), "[r2]"); }); @@ -758,7 +757,7 @@ public void testRemoveReplicationClusters() throws Exception { // removing topic replica cluster policy, so namespace policy should take effect admin1.topics().removeReplicationClusters(persistentTopicName); Awaitility.await().untilAsserted(() -> { - List replicaClusters = topicRef.getReplicators().keys().stream().sorted().collect(Collectors.toList()); + List replicaClusters = topicRef.getReplicators().keySet().stream().sorted().toList(); assertEquals(replicaClusters.size(), 2); assertEquals(replicaClusters.toString(), "[r2, r3]"); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java index 2d348f8259746..aa39e859a8c3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java @@ -32,7 +32,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -118,7 +117,7 @@ public void testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() th } txn1.commit().get(); Awaitility.await().untilAsserted(() -> { - ConcurrentOpenHashMap replicators = topic1.getReplicators(); + final var replicators = topic1.getReplicators(); assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index e2aec70fb114e..e0d6a432bdad2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -37,7 +37,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; @@ -212,7 +211,7 @@ public void testSubscriptionsOnNonPersistentTopic() throws Exception { .subscriptionMode(SubscriptionMode.Durable) .subscribe(); - ConcurrentOpenHashMap subscriptionMap = mockTopic.getSubscriptions(); + final var subscriptionMap = mockTopic.getSubscriptions(); assertEquals(subscriptionMap.size(), 4); // Check exclusive subscription diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java index 562c5eda58109..fa409832fc17b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java @@ -28,9 +28,8 @@ import io.netty.buffer.ByteBuf; import java.time.Clock; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; @@ -75,10 +74,8 @@ public void setup() { @Test public void testBuildSnapshotWith2Clusters() throws Exception { - List remoteClusters = Collections.singletonList("b"); - ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, + Set.of("b"), conf, clock); assertTrue(markers.isEmpty()); @@ -115,10 +112,8 @@ public void testBuildSnapshotWith2Clusters() throws Exception { @Test public void testBuildSnapshotWith3Clusters() throws Exception { - List remoteClusters = Arrays.asList("b", "c"); - ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, + Set.of("b", "c"), conf, clock); assertTrue(markers.isEmpty()); @@ -198,10 +193,8 @@ public void testBuildSnapshotWith3Clusters() throws Exception { @Test public void testBuildTimeout() { - List remoteClusters = Collections.singletonList("b"); - ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(controller, - remoteClusters, + Set.of("b"), conf, clock); assertFalse(builder.isTimedOut()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java index cf923df0411dd..1e036cf580818 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; import org.apache.bookkeeper.mledger.util.StatsBuckets; @@ -28,9 +29,8 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.Replicator; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -72,7 +72,7 @@ public void testGenerateSubscriptionsStats() { // prepare multi-layer topic map ConcurrentOpenHashMap bundlesMap = ConcurrentOpenHashMap.newBuilder().build(); ConcurrentOpenHashMap topicsMap = ConcurrentOpenHashMap.newBuilder().build(); - ConcurrentOpenHashMap subscriptionsMaps = ConcurrentOpenHashMap.newBuilder().build(); + final var subscriptionsMaps = new ConcurrentHashMap(); bundlesMap.put("my-bundle", topicsMap); multiLayerTopicsMap.put(namespace, bundlesMap); @@ -88,7 +88,7 @@ public void testGenerateSubscriptionsStats() { // Prepare topic and subscription PersistentTopic topic = Mockito.mock(PersistentTopic.class); - Subscription subscription = Mockito.mock(Subscription.class); + PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); Consumer consumer = Mockito.mock(Consumer.class); ConsumerStatsImpl consumerStats = new ConsumerStatsImpl(); when(consumer.getStats()).thenReturn(consumerStats); @@ -100,7 +100,7 @@ public void testGenerateSubscriptionsStats() { when(topic.getStats(false, false, false)).thenReturn(topicStats); when(topic.getBrokerService()).thenReturn(broker); when(topic.getSubscriptions()).thenReturn(subscriptionsMaps); - when(topic.getReplicators()).thenReturn(ConcurrentOpenHashMap.newBuilder().build()); + when(topic.getReplicators()).thenReturn(new ConcurrentHashMap<>()); when(topic.getManagedLedger()).thenReturn(ml); when(topic.getBacklogQuota(Mockito.any())).thenReturn(Mockito.mock(BacklogQuota.class)); PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();