diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java index 641ffd59..7f28f1f7 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java @@ -19,10 +19,11 @@ import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.isDefaultExchange; import io.streamnative.pulsar.handlers.amqp.common.exception.AoPException; -import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.naming.NamespaceName; @@ -168,8 +169,8 @@ public CompletableFuture exchangeBound(NamespaceName namespaceName, Str if (null == amqpExchange) { replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND; } else { - List subs = amqpExchange.getTopic().getSubscriptions().keys(); - if (null == subs || subs.isEmpty()) { + Set subs = amqpExchange.getTopic().getSubscriptions().keySet(); + if (CollectionUtils.isEmpty(subs)) { replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; } else { replyCode = ExchangeBoundOkBody.OK; diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java index c523a634..1fc0f76d 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import lombok.AllArgsConstructor; import lombok.Data; @@ -54,7 +55,6 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; /** * Persistent Exchange. @@ -72,7 +72,7 @@ public class PersistentExchange extends AbstractAmqpExchange { private static final String BINDINGS = "BINDINGS"; private PersistentTopic persistentTopic; - private final ConcurrentOpenHashMap> cursors; + private final ConcurrentHashMap> cursors; private AmqpExchangeReplicator messageReplicator; private AmqpEntryWriter amqpEntryWriter; @@ -97,7 +97,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis super(exchangeName, type, Sets.newConcurrentHashSet(), durable, autoDelete, internal, arguments); this.persistentTopic = persistentTopic; topicNameValidate(); - cursors = new ConcurrentOpenHashMap<>(16, 1); + cursors = new ConcurrentHashMap<>(16, 1); for (ManagedCursor cursor : persistentTopic.getManagedLedger().getCursors()) { cursors.put(cursor.getName(), CompletableFuture.completedFuture(cursor)); log.info("PersistentExchange {} recover cursor {}", persistentTopic.getName(), cursor.toString()); diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java index 3c288798..a8e2cd8c 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.extern.log4j.Log4j2; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -59,7 +60,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.qpid.server.protocol.ProtocolVersion; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; @@ -189,7 +189,7 @@ private void initMockAmqpTopicManager(){ Mockito.any(), Mockito.anyBoolean(), Mockito.any())).thenReturn(subFuture); when(subscription.getDispatcher()).thenReturn(mock(Dispatcher.class)); when(subscription.addConsumer(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null)); - when(persistentTopic.getSubscriptions()).thenReturn(new ConcurrentOpenHashMap<>()); + when(persistentTopic.getSubscriptions()).thenReturn(new ConcurrentHashMap<>()); ManagedLedger managedLedger = mock(ManagedLedgerImpl.class); when(managedLedger.getCursors()).thenReturn(new ManagedCursorContainer()); when(persistentTopic.getManagedLedger()).thenReturn(managedLedger); diff --git a/pom.xml b/pom.xml index 0d9ce9b1..bf481ee1 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ ${maven.compiler.target} - 4.0.0-ursa-4-SNAPSHOT + 4.0.0-ursa-10-SNAPSHOT 8.0.0 5.8.0