Skip to content

Commit

Permalink
[improve][broker] Replace ConcurrentOpenHashMap with ConcurrentHashMa…
Browse files Browse the repository at this point in the history
…p in Topic classes
  • Loading branch information
BewareMyPower committed Sep 19, 2024
1 parent 03330b3 commit e5d355e
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
getTopicReferenceAsync(topicName)
.thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys())))
.thenAccept(topic -> asyncResponse.resume(topic.getSubscriptions().keySet()))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -2020,7 +2020,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
new ArrayList<>((int) topic.getReplicators().size());
List<String> subNames =
new ArrayList<>((int) topic.getSubscriptions().size());
subNames.addAll(topic.getSubscriptions().keys().stream().filter(
subNames.addAll(topic.getSubscriptions().keySet().stream().filter(
subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)).toList());
for (int i = 0; i < subNames.size(); i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -580,7 +581,7 @@ && getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddres
public abstract int getNumberOfSameAddressConsumers(String clientAddress);

protected int getNumberOfSameAddressConsumers(final String clientAddress,
final List<? extends Subscription> subscriptions) {
final Collection<? extends Subscription> subscriptions) {
int count = 0;
if (clientAddress != null) {
for (Subscription subscription : subscriptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,15 +1208,15 @@ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceD
// v2 topics have a global name so check if the topic is replicated.
if (t.isReplicated()) {
// Delete is disallowed on global topic
final List<String> clusters = t.getReplicators().keys();
final var clusters = t.getReplicators().keySet();
log.error("Delete forbidden topic {} is replicated on clusters {}", topic, clusters);
return FutureUtil.failedFuture(
new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters));
}

// shadow topic should be deleted first.
if (t.isShadowReplicated()) {
final List<String> shadowTopics = t.getShadowReplicators().keys();
final var shadowTopics = t.getShadowReplicators().keySet();
log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", topic, shadowTopics);
return FutureUtil.failedFuture(new IllegalStateException(
"Delete forbidden. Topic " + topic + " is replicated to shadow topics."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -183,7 +182,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> unsubscribe(String subName);

ConcurrentOpenHashMap<String, ? extends Subscription> getSubscriptions();
Map<String, ? extends Subscription> getSubscriptions();

CompletableFuture<Void> delete();

Expand Down Expand Up @@ -265,9 +264,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

Subscription getSubscription(String subscription);

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();
Map<String, ? extends Replicator> getReplicators();

ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();
Map<String, ? extends Replicator> getShadowReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -96,7 +97,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
Expand All @@ -105,9 +105,9 @@
public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> {

// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
private final Map<String, NonPersistentSubscription> subscriptions = new ConcurrentHashMap<>();

private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
private final Map<String, NonPersistentReplicator> replicators = new ConcurrentHashMap<>();

// Ever increasing counter of entries added
private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER =
Expand Down Expand Up @@ -152,17 +152,6 @@ public void reset() {

public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);

this.subscriptions =
ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators =
ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.isFenced = false;
registerTopicPolicyListener();
}
Expand Down Expand Up @@ -446,8 +435,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(
new TopicBusyException("Topic has subscriptions:" + subscriptions.keys()));
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions:"
+ subscriptions.keySet().stream().toList()));
return;
}
} else {
Expand Down Expand Up @@ -714,18 +703,18 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) {
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
public Map<String, NonPersistentSubscription> getSubscriptions() {
return subscriptions;
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
public Map<String, NonPersistentReplicator> getReplicators() {
return replicators;
}

@Override
public ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators() {
return ConcurrentOpenHashMap.emptyMap();
public Map<String, ? extends Replicator> getShadowReplicators() {
return Map.of();
}

@Override
Expand Down Expand Up @@ -1043,7 +1032,6 @@ private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {

private CompletableFuture<Void> disconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
futures.add(replicator.terminate());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -186,7 +187,6 @@
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
Expand All @@ -206,10 +206,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
protected final ManagedLedger ledger;

// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
private final Map<String, PersistentSubscription> subscriptions = new ConcurrentHashMap<>();

private final ConcurrentOpenHashMap<String/*RemoteCluster*/, Replicator> replicators;
private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> shadowReplicators;
private final Map<String/*RemoteCluster*/, Replicator> replicators = new ConcurrentHashMap<>();
private final Map<String/*ShadowTopic*/, Replicator> shadowReplicators = new ConcurrentHashMap<>();
@Getter
private volatile List<String> shadowTopics;
private final TopicName shadowSourceTopic;
Expand Down Expand Up @@ -391,18 +391,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
? brokerService.getTopicOrderedExecutor().chooseThread(topic)
: null;
this.ledger = ledger;
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
Expand Down Expand Up @@ -475,41 +463,6 @@ public CompletableFuture<Void> initialize() {
}));
}

// for testing purposes
@VisibleForTesting
PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger,
MessageDeduplication messageDeduplication) {
super(topic, brokerService);
// null check for backwards compatibility with tests which mock the broker service
this.orderedExecutor = brokerService.getTopicOrderedExecutor() != null
? brokerService.getTopicOrderedExecutor().chooseThread(topic)
: null;
this.ledger = ledger;
this.messageDeduplication = messageDeduplication;
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.shadowReplicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();

if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable(this);
}
shadowSourceTopic = null;
}

private void initializeDispatchRateLimiterIfNeeded() {
synchronized (dispatchRateLimiterLock) {
// dispatch rate limiter for topic
Expand Down Expand Up @@ -1454,8 +1407,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (!closeIfClientsConnected) {
if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions: "
+ subscriptions.keySet().stream().toList()));
} else if (failIfHasBacklogs) {
if (hasBacklogs(false)) {
List<String> backlogSubs =
Expand Down Expand Up @@ -2135,10 +2088,6 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
}
return null;
});
// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -2216,11 +2165,6 @@ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic
}
return null;
});

// clean up replicator if startup is failed
if (replicator == null) {
shadowReplicators.removeNullValue(shadowTopic);
}
});
}

Expand Down Expand Up @@ -2280,7 +2224,7 @@ protected String getSchemaId() {
}

@Override
public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() {
public Map<String, PersistentSubscription> getSubscriptions() {
return subscriptions;
}

Expand All @@ -2290,12 +2234,12 @@ public PersistentSubscription getSubscription(String subscriptionName) {
}

@Override
public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
public Map<String, Replicator> getReplicators() {
return replicators;
}

@Override
public ConcurrentOpenHashMap<String, Replicator> getShadowReplicators() {
public Map<String, Replicator> getShadowReplicators() {
return shadowReplicators;
}

Expand Down Expand Up @@ -3097,7 +3041,6 @@ private CompletableFuture<Void> checkAndDisconnectProducers() {

private CompletableFuture<Void> checkAndDisconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
if (replicator.getNumberOfEntriesInBacklog() <= 0) {
futures.add(replicator.terminate());
Expand All @@ -3112,12 +3055,9 @@ public boolean shouldProducerMigrate() {

@Override
public boolean isReplicationBacklogExist() {
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
if (replicators != null) {
for (Replicator replicator : replicators.values()) {
if (replicator.getNumberOfEntriesInBacklog() > 0) {
return true;
}
for (Replicator replicator : replicators.values()) {
if (replicator.getNumberOfEntriesInBacklog() > 0) {
return true;
}
}
return false;
Expand Down Expand Up @@ -3765,9 +3705,9 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco
public CompletableFuture<Void> clearBacklog() {
log.info("[{}] Clearing backlog on all cursors in the topic.", topic);
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<String> cursors = getSubscriptions().keys();
cursors.addAll(getReplicators().keys());
cursors.addAll(getShadowReplicators().keys());
List<String> cursors = new ArrayList<>(getSubscriptions().keySet());
cursors.addAll(getReplicators().keySet());
cursors.addAll(getShadowReplicators().keySet());
for (String cursor : cursors) {
futures.add(clearBacklog(cursor));
}
Expand Down Expand Up @@ -4167,7 +4107,7 @@ private void unfenceReplicatorsToResume() {
checkShadowReplication();
}

private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
private void removeTerminatedReplicators(Map<String, Replicator> replicators) {
Map<String, Replicator> terminatedReplicators = new HashMap<>();
replicators.forEach((cluster, replicator) -> {
if (replicator.isTerminated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ private void startNewSnapshot() {
pendingSnapshotsMetric.inc();
stats.recordSnapshotStarted();
ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
topic.getReplicators().keySet(), topic.getBrokerService().pulsar().getConfiguration(),
Clock.systemUTC());
pendingSnapshots.put(builder.getSnapshotId(), builder);
builder.start();
}
Expand Down
Loading

0 comments on commit e5d355e

Please sign in to comment.