clientCnxFuture : clientWitBinaryLookup.getCnxPool().getConnections()) {
+ field.set(clientCnxFuture.get(), false);
+ }
+ try {
+ clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
+ Assert.fail("Expected an error that the broker version is too old.");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker"));
+ }
+
+ // cleanup.
+ producer.close();
+ clientWitBinaryLookup.close();
+ admin.topics().delete(topic, false);
+ }
}
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index bfbbfb7487c42..36f5869d73de6 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -17,17 +17,17 @@
# under the License.
#
-applicationName="pulsar_broker"
-zookeeperServers="localhost"
-configurationStoreServers="localhost"
+applicationName=pulsar_broker
+zookeeperServers=localhost
+configurationStoreServers=localhost
brokerServicePort=6650
-brokerServicePortTls=6651
+brokerServicePortTls=
webServicePort=8080
-webServicePortTls=4443
+webServicePortTls=
httpMaxRequestHeaderSize=1234
bindAddress=0.0.0.0
advertisedAddress=
-clusterName="test_cluster"
+clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
@@ -42,17 +42,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
-tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
-tlsKeyFilePath=/home/local/conf/pulsar/server.key
+tlsCertificateFilePath=
+tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
-superUserRoles="test_user"
-brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+superUserRoles=test_user
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
-bookkeeperClientAuthenticationPlugin="test_auth_plugin"
-bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientAuthenticationPlugin=
+bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
@@ -64,7 +64,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
-bookkeeperClientIsolationGroups="test_group"
+bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index c733409fc0043..0748418be6390 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -17,18 +17,18 @@
# under the License.
#
-applicationName="pulsar_broker"
-metadataStoreUrl="zk:localhost:2181/ledger"
-configurationMetadataStoreUrl="zk:localhost:2181"
-brokerServicePort=6650
-brokerServicePortTls=6651
-webServicePort=8080
-webServicePortTls=4443
+applicationName=pulsar_broker
+metadataStoreUrl=zk:localhost:2181/ledger
+configurationMetadataStoreUrl=zk:localhost:2181
+brokerServicePort=0
+brokerServicePortTls=
+webServicePort=0
+webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651
internalListenerName=internal
-clusterName="test_cluster"
+clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
@@ -49,11 +49,11 @@ tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
-superUserRoles="test_user"
-brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+superUserRoles=test_user
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
-bookkeeperClientAuthenticationPlugin="test_auth_plugin"
-bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientAuthenticationPlugin=
+bookkeeperClientAuthenticationAppId=
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
@@ -65,7 +65,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
-bookkeeperClientIsolationGroups="test_group"
+bookkeeperClientIsolationGroups=
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
index d8b26bbbfa99d..46c876686b05b 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
@@ -17,19 +17,19 @@
# under the License.
#
-applicationName="pulsar_broker"
+applicationName=pulsar_broker
metadataStoreUrl=
configurationMetadataStoreUrl=
-brokerServicePort=6650
-brokerServicePortTls=6651
-webServicePort=8080
+brokerServicePort=0
+brokerServicePortTls=
+webServicePort=0
allowLoopback=true
-webServicePortTls=4443
+webServicePortTls=
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=
internalListenerName=internal
-clusterName="test_cluster"
+clusterName=test_cluster
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
@@ -44,17 +44,17 @@ clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
-tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
-tlsKeyFilePath=/home/local/conf/pulsar/server.key
+tlsCertificateFilePath=
+tlsKeyFilePath=
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
-superUserRoles="test_user"
-brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+superUserRoles=test_user
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
brokerClientAuthenticationParameters=
-bookkeeperClientAuthenticationPlugin="test_auth_plugin"
-bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientAuthenticationPlugin=
+bookkeeperClientAuthenticationAppId=test_auth_id
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
@@ -66,7 +66,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
-bookkeeperClientIsolationGroups="test_group"
+bookkeeperClientIsolationGroups=test_group
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
diff --git a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index 4e2fd40298354..6f0d82cef17bc 100644
--- a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++ b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -17,8 +17,8 @@
# under the License.
#
-brokerServicePort=6650
-webServicePort=8080
+brokerServicePort=0
+webServicePort=0
allowLoopback=true
clusterName=test_cluster
superUserRoles=admin
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 78952fcaed8b3..6c46bce254f6f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -308,14 +308,33 @@ static ClientBuilder builder() {
*
* This can be used to discover the partitions and create {@link Reader}, {@link Consumer} or {@link Producer}
* instances directly on a particular partition.
- *
+ * @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(String, boolean)}.
* @param topic
* the topic name
* @return a future that will yield a list of the topic partitions or {@link PulsarClientException} if there was any
* error in the operation.
+ *
* @since 2.3.0
*/
- CompletableFuture> getPartitionsForTopic(String topic);
+ @Deprecated
+ default CompletableFuture> getPartitionsForTopic(String topic) {
+ return getPartitionsForTopic(topic, true);
+ }
+
+ /**
+ * 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a
+ * partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists.
+ * 2. When {@param metadataAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned
+ * topic does not exist. You will get an {@link PulsarClientException.NotFoundException} or a
+ * {@link PulsarClientException.TopicDoesNotExistException}.
+ * 2-1. You will get a {@link PulsarClientException.NotSupportedException} with metadataAutoCreationEnabled=false
+ * on an old broker version which does not support getting partitions without partitioned metadata auto-creation.
+ * 3. When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using
+ * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned.
+ * For the result, see case 1.
+ * @version 3.3.0.
+ */
+ CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled);
/**
* Close the PulsarClient and release all the resources.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 9d01d863143e2..980e8a0c78627 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -121,12 +121,14 @@ public CompletableFuture> getBroker(T
* calls broker binaryProto-lookup api to get metadata of partitioned-topic.
*
*/
- public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) {
+ @Override
+ public CompletableFuture getPartitionedTopicMetadata(
+ TopicName topicName, boolean metadataAutoCreationEnabled) {
final MutableObject newFutureCreated = new MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> {
- CompletableFuture newFuture =
- getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
+ CompletableFuture newFuture = getPartitionedTopicMetadata(
+ serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled);
newFutureCreated.setValue(newFuture);
return newFuture;
});
@@ -222,13 +224,20 @@ private CompletableFuture> findBroker
}
private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress,
- TopicName topicName) {
+ TopicName topicName, boolean metadataAutoCreationEnabled) {
CompletableFuture partitionFuture = new CompletableFuture<>();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
+ if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
+ partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of"
+ + " getting partitions without auto-creation is not supported from the broker,"
+ + " please upgrade the broker to the latest version."));
+ return;
+ }
long requestId = client.newRequestId();
- ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
+ ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
+ metadataAutoCreationEnabled);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 8694dad7a2d84..b8caa0f438361 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -186,6 +186,8 @@ public class ClientCnx extends PulsarHandler {
protected AuthenticationDataProvider authenticationDataProvider;
private TransactionBufferHandler transactionBufferHandler;
private boolean supportsTopicWatchers;
+ @Getter
+ private boolean supportsGetPartitionedMetadataWithoutAutoCreation;
/** Idle stat. **/
@Getter
@@ -382,6 +384,9 @@ protected void handleConnected(CommandConnected connected) {
supportsTopicWatchers =
connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers();
+ supportsGetPartitionedMetadataWithoutAutoCreation =
+ connected.hasFeatureFlags()
+ && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation();
// set remote protocol version to the correct version before we complete the connection future
setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 6403d48d7be0c..2b7fb90b14a47 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -96,9 +96,16 @@ protected void grabCnx() {
try {
CompletableFuture cnxFuture;
if (state.redirectedClusterURI != null) {
- InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
- state.redirectedClusterURI.getPort());
- cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
+ if (state.topic == null) {
+ InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
+ state.redirectedClusterURI.getPort());
+ cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
+ } else {
+ // once, client receives redirection url, client has to perform lookup on migrated
+ // cluster to find the broker that owns the topic and then create connection.
+ // below method, performs the lookup for a given topic and then creates connection
+ cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
+ }
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 7686d0072cffb..4d6cf96a01068 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -32,6 +32,7 @@
import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -58,7 +59,6 @@
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
@Getter(AccessLevel.PUBLIC)
@@ -104,6 +104,31 @@ public Consumer subscribe() throws PulsarClientException {
}
}
+ private CompletableFuture checkDlqAlreadyExists(String topic) {
+ CompletableFuture existsFuture = new CompletableFuture<>();
+ client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata -> {
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPersistent()) {
+ // Either partitioned or non-partitioned, it exists.
+ existsFuture.complete(true);
+ } else {
+ // If it is a non-persistent topic, return true only it is a partitioned topic.
+ existsFuture.complete(metadata != null && metadata.partitions > 0);
+ }
+ }).exceptionally(ex -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof PulsarClientException.NotFoundException
+ || actEx instanceof PulsarClientException.TopicDoesNotExistException
+ || actEx instanceof PulsarAdminException.NotFoundException) {
+ existsFuture.complete(false);
+ } else {
+ existsFuture.completeExceptionally(ex);
+ }
+ return null;
+ });
+ return existsFuture;
+ }
+
@Override
public CompletableFuture> subscribeAsync() {
if (conf.getTopicNames().isEmpty() && conf.getTopicsPattern() == null) {
@@ -135,20 +160,18 @@ public CompletableFuture> subscribeAsync() {
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
|| StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
- CompletableFuture retryLetterTopicMetadata =
- client.getPartitionedTopicMetadata(oldRetryLetterTopic);
- CompletableFuture deadLetterTopicMetadata =
- client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+ CompletableFuture retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic);
+ CompletableFuture deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic);
applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
.thenAccept(__ -> {
String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+ RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+ RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
- if (retryLetterTopicMetadata.join().partitions > 0) {
+ if (retryLetterTopicMetadata.join()) {
retryLetterTopic = oldRetryLetterTopic;
}
- if (deadLetterTopicMetadata.join().partitions > 0) {
+ if (deadLetterTopicMetadata.join()) {
deadLetterTopic = oldDeadLetterTopic;
}
if (deadLetterPolicy == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 6e8c2b4314e17..ba04aaa3b3117 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -108,10 +108,11 @@ public CompletableFuture> getBroker(T
}
@Override
- public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) {
+ public CompletableFuture getPartitionedTopicMetadata(
+ TopicName topicName, boolean metadataAutoCreationEnabled) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
- return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
- PartitionedTopicMetadata.class);
+ return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation="
+ + metadataAutoCreationEnabled, PartitionedTopicMetadata.class);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 48ef67eae2047..ba99cb77550f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -60,11 +60,30 @@ public interface LookupService extends AutoCloseable {
/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
- *
- * @param topicName topic-name
- * @return
+ * Note: this method will try to create the topic partitioned metadata if it does not exist.
+ * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean)}}.
*/
- CompletableFuture getPartitionedTopicMetadata(TopicName topicName);
+ @Deprecated
+ default CompletableFuture getPartitionedTopicMetadata(TopicName topicName) {
+ return getPartitionedTopicMetadata(topicName, true);
+ }
+
+ /**
+ * 1.Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists;
+ * return "{partition: 0}" if a non-partitioned topic exists.
+ * 2. When {@param metadataAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic
+ * does not exist. You will get a {@link PulsarClientException.NotFoundException} or
+ * a {@link PulsarClientException.TopicDoesNotExistException}.
+ * 2-1. You will get a {@link PulsarClientException.NotSupportedException} with metadataAutoCreationEnabled=false
+ * on an old broker version which does not support getting partitions without partitioned metadata
+ * auto-creation.
+ * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using
+ * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned.
+ * For the result, see case 1.
+ * @version 3.3.0.
+ */
+ CompletableFuture getPartitionedTopicMetadata(TopicName topicName,
+ boolean metadataAutoCreationEnabled);
/**
* Returns current SchemaInfo {@link SchemaInfo} for a given topic.
@@ -105,5 +124,4 @@ public interface LookupService extends AutoCloseable {
*/
CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
String topicPattern, String topicsHash);
-
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f2bce59a1e68e..62b6612fa3c26 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -954,7 +954,7 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo
CompletableFuture subscribeResult = new CompletableFuture<>();
- client.getPartitionedTopicMetadata(topicName)
+ client.getPartitionedTopicMetadata(topicName, true)
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
createTopicIfDoesNotExist))
.exceptionally(ex1 -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b92e039e5facd..899e4e8fae7c7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -32,7 +32,9 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -106,6 +108,7 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdScheduledProviders;
private LookupService lookup;
+ private Map urlLookupMap = new ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
@Getter
private final Timer timer;
@@ -377,7 +380,7 @@ private CompletableFuture> createProducerAsync(String topic,
ProducerInterceptors interceptors) {
CompletableFuture> producerCreatedFuture = new CompletableFuture<>();
- getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
+ getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
}
@@ -519,7 +522,7 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC
String topic = conf.getSingleTopic();
- getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
+ getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
}
@@ -659,7 +662,7 @@ protected CompletableFuture> createSingleTopicReaderAsync(
CompletableFuture> readerFuture = new CompletableFuture<>();
- getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
+ getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
}
@@ -733,6 +736,21 @@ public void close() throws PulsarClientException {
}
}
+ private void closeUrlLookupMap() {
+ Map closedUrlLookupServices = new HashMap(urlLookupMap.size());
+ urlLookupMap.entrySet().forEach(e -> {
+ try {
+ e.getValue().close();
+ } catch (Exception ex) {
+ log.error("Error closing lookup service {}", e.getKey(), ex);
+ }
+ closedUrlLookupServices.put(e.getKey(), e.getValue());
+ });
+ closedUrlLookupServices.entrySet().forEach(e -> {
+ urlLookupMap.remove(e.getKey(), e.getValue());
+ });
+ }
+
@Override
public CompletableFuture closeAsync() {
log.info("Client closing. URL: {}", lookup.getServiceUrl());
@@ -743,6 +761,8 @@ public CompletableFuture closeAsync() {
final CompletableFuture closeFuture = new CompletableFuture<>();
List> futures = new ArrayList<>();
+ closeUrlLookupMap();
+
producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
if (t != null) {
log.error("Error closing producer {}", p, t);
@@ -962,6 +982,27 @@ public CompletableFuture getConnection(final String topic) {
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}
+ public CompletableFuture getConnection(final String topic, final String url) {
+ TopicName topicName = TopicName.get(topic);
+ return getLookup(url).getBroker(topicName)
+ .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
+ }
+
+ public LookupService getLookup(String serviceUrl) {
+ return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
+ if (isClosed()) {
+ throw new IllegalStateException("Pulsar client has been closed, can not build LookupService when"
+ + " calling get lookup with an url");
+ }
+ try {
+ return createLookup(serviceUrl);
+ } catch (PulsarClientException e) {
+ log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
+ throw new IllegalStateException("Failed to update url " + url);
+ }
+ });
+ }
+
public CompletableFuture getConnectionToServiceUrl() {
if (!(lookup instanceof BinaryProtoLookupService)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
@@ -1020,19 +1061,20 @@ public LookupService getLookup() {
}
public void reloadLookUp() throws PulsarClientException {
- if (conf.getServiceUrl().startsWith("http")) {
- lookup = new HttpLookupService(conf, eventLoopGroup);
+ lookup = createLookup(conf.getServiceUrl());
+ }
+
+ public LookupService createLookup(String url) throws PulsarClientException {
+ if (url.startsWith("http")) {
+ return new HttpLookupService(conf, eventLoopGroup);
} else {
- lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
+ return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
}
- public CompletableFuture getNumberOfPartitions(String topic) {
- return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions);
- }
-
- public CompletableFuture getPartitionedTopicMetadata(String topic) {
+ public CompletableFuture getPartitionedTopicMetadata(
+ String topic, boolean metadataAutoCreationEnabled) {
CompletableFuture metadataFuture = new CompletableFuture<>();
@@ -1045,7 +1087,7 @@ public CompletableFuture getPartitionedTopicMetadata(S
.setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.create();
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
- metadataFuture, new ArrayList<>());
+ metadataFuture, new ArrayList<>(), metadataAutoCreationEnabled);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
}
@@ -1056,15 +1098,19 @@ private void getPartitionedTopicMetadata(TopicName topicName,
Backoff backoff,
AtomicLong remainingTime,
CompletableFuture future,
- List previousExceptions) {
+ List previousExceptions,
+ boolean metadataAutoCreationEnabled) {
long startTime = System.nanoTime();
- lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
+ CompletableFuture queryFuture =
+ lookup.getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled);
+ queryFuture.thenAccept(future::complete).exceptionally(e -> {
remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
long nextDelay = Math.min(backoff.next(), remainingTime.get());
// skip retry scheduler when set lookup throttle in client or server side which will lead to
// `TooManyRequestsException`
boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause())
- || e.getCause() instanceof PulsarClientException.AuthenticationException;
+ || e.getCause() instanceof PulsarClientException.AuthenticationException
+ || e.getCause() instanceof PulsarClientException.NotFoundException;
if (nextDelay <= 0 || isLookupThrottling) {
PulsarClientException.setPreviousExceptions(e, previousExceptions);
future.completeExceptionally(e);
@@ -1076,15 +1122,16 @@ private void getPartitionedTopicMetadata(TopicName topicName,
log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- "
+ "Will try again in {} ms", topicName, nextDelay);
remainingTime.addAndGet(-nextDelay);
- getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions);
+ getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions,
+ metadataAutoCreationEnabled);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}
@Override
- public CompletableFuture> getPartitionsForTopic(String topic) {
- return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
+ public CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) {
+ return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled).thenApply(metadata -> {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List partitions = new ArrayList<>(metadata.partitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 9e79fc203c225..499627f9c73f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -79,7 +79,8 @@ public void start() throws TransactionCoordinatorClientException {
@Override
public CompletableFuture startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
- return pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
+ return pulsarClient.getLookup()
+ .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, true)
.thenCompose(partitionMeta -> {
List> connectFutureList = new ArrayList<>();
if (LOG.isDebugEnabled()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index febec2bff3285..191124bb7b002 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -22,6 +22,7 @@
import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -153,7 +154,8 @@ private MultiTopicsConsumerImpl createMultiTopicsConsumer(
int completionDelayMillis = 100;
Schema schema = Schema.BYTES;
PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
- when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createDelayedCompletedFuture(
+ when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+ .thenAnswer(invocation -> createDelayedCompletedFuture(
new PartitionedTopicMetadata(), completionDelayMillis));
MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
clientMock, consumerConfData, executorProvider,
@@ -201,7 +203,8 @@ public void testConsumerCleanupOnSubscribeFailure() {
int completionDelayMillis = 10;
Schema schema = Schema.BYTES;
PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
- when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture(
+ when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+ .thenAnswer(invocation -> createExceptionFuture(
new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis));
CompletableFuture> completeFuture = new CompletableFuture<>();
MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(clientMock, consumerConfData,
@@ -237,7 +240,8 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc
// Simulate non partitioned topics
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);
- when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata));
+ when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(metadata));
CompletableFuture> completeFuture = new CompletableFuture<>();
MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl<>(
@@ -248,7 +252,7 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc
// getPartitionedTopicMetadata should have been called only the first time, for each of the 3 topics,
// but not anymore since the topics are not partitioned.
- verify(clientMock, times(3)).getPartitionedTopicMetadata(any());
+ verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean());
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index e0b25db891247..e13c060a052ec 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.nullable;
@@ -106,7 +107,7 @@ public void testConsumerIsClosed() throws Exception {
nullable(String.class)))
.thenReturn(CompletableFuture.completedFuture(
new GetTopicsResult(Collections.emptyList(), null, false, true)));
- when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
+ when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata()));
when(lookup.getBroker(any()))
.thenReturn(CompletableFuture.completedFuture(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
index eee8ba4e8f41a..01353e47cd0cb 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
@@ -18,6 +18,14 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -25,32 +33,25 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertNotNull;
-
/**
- * Unit tests of {@link TablewViewBuilderImpl}.
+ * Unit tests of {@link TableViewBuilderImpl}.
*/
public class TableViewBuilderImplTest {
private static final String TOPIC_NAME = "testTopicName";
private PulsarClientImpl client;
private TableViewBuilderImpl tableViewBuilderImpl;
+ private CompletableFuture readNextFuture;
@BeforeClass(alwaysRun = true)
public void setup() {
Reader reader = mock(Reader.class);
- when(reader.readNextAsync()).thenReturn(CompletableFuture.allOf());
+ readNextFuture = new CompletableFuture();
+ when(reader.readNextAsync()).thenReturn(readNextFuture);
client = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(client.getCnxPool()).thenReturn(connectionPool);
@@ -61,6 +62,14 @@ public void setup() {
tableViewBuilderImpl = new TableViewBuilderImpl(client, Schema.BYTES);
}
+ @AfterClass(alwaysRun = true)
+ public void cleanup() {
+ if (readNextFuture != null) {
+ readNextFuture.completeExceptionally(new PulsarClientException.AlreadyClosedException("Closing test case"));
+ readNextFuture = null;
+ }
+ }
+
@Test
public void testTableViewBuilderImpl() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 2332a91a469d4..e39dd288abf6e 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -99,6 +99,12 @@
linux-x86_64
+
+ io.netty
+ netty-transport-native-epoll
+ linux-aarch_64
+
+
io.netty
netty-transport-native-unix-common
@@ -238,6 +244,11 @@
awaitility
test
+
+
+ org.roaringbitmap
+ RoaringBitmap
+
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index eebca0e0d7214..e051e01495dbe 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -358,17 +358,16 @@ public static String fromPersistenceNamingEncoding(String mlName) {
String localName;
if (parts.size() == 4) {
tenant = parts.get(0);
- cluster = null;
namespacePortion = parts.get(1);
domain = parts.get(2);
- localName = parts.get(3);
+ localName = Codec.decode(parts.get(3));
return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName);
} else if (parts.size() == 5) {
tenant = parts.get(0);
cluster = parts.get(1);
namespacePortion = parts.get(2);
domain = parts.get(3);
- localName = parts.get(4);
+ localName = Codec.decode(parts.get(4));
return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, namespacePortion, localName);
} else {
throw new IllegalArgumentException("Invalid managedLedger name: " + mlName);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index faa5fbcd30130..c352da0c871ed 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -188,6 +188,7 @@ private static void setFeatureFlags(FeatureFlags flags) {
flags.setSupportsAuthRefresh(true);
flags.setSupportsBrokerEntryMetadata(true);
flags.setSupportsPartialProducer(true);
+ flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
}
public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion,
@@ -298,6 +299,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max
connected.setProtocolVersion(versionToAdvertise);
connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers);
+ connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
return cmd;
}
@@ -880,11 +882,13 @@ public static ByteBuf newPartitionMetadataResponse(ServerError error, String err
return serializeWithSize(newPartitionMetadataResponseCommand(error, errorMsg, requestId));
}
- public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
+ public static ByteBuf newPartitionMetadataRequest(String topic, long requestId,
+ boolean metadataAutoCreationEnabled) {
BaseCommand cmd = localCmd(Type.PARTITIONED_METADATA);
cmd.setPartitionMetadata()
.setTopic(topic)
- .setRequestId(requestId);
+ .setRequestId(requestId)
+ .setMetadataAutoCreationEnabled(metadataAutoCreationEnabled);
return serializeWithSize(cmd);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index f6fcb12f35939..0628d494af3af 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -199,9 +199,9 @@ public static CompletableFuture failedFuture(Throwable t) {
public static Throwable unwrapCompletionException(Throwable ex) {
if (ex instanceof CompletionException) {
- return ex.getCause();
+ return unwrapCompletionException(ex.getCause());
} else if (ex instanceof ExecutionException) {
- return ex.getCause();
+ return unwrapCompletionException(ex.getCause());
} else {
return ex;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 72215d7296cc3..b5ad89d1695d4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.mutable.MutableInt;
+import org.roaringbitmap.RoaringBitSet;
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
@@ -44,7 +45,7 @@
public class ConcurrentOpenLongPairRangeSet> implements LongPairRangeSet {
protected final NavigableMap rangeBitSetMap = new ConcurrentSkipListMap<>();
- private boolean threadSafe = true;
+ private final boolean threadSafe;
private final int bitSetSize;
private final LongPairConsumer consumer;
@@ -95,9 +96,7 @@ public void addOpenClosed(long lowerKey, long lowerValueOpen, long upperKey, lon
// (2) set 0th-index to upper-index in upperRange.getKey()
if (isValid(upperKey, upperValue)) {
BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet());
- if (rangeBitSet != null) {
- rangeBitSet.set(0, (int) upperValue + 1);
- }
+ rangeBitSet.set(0, (int) upperValue + 1);
}
// No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing
// to set
@@ -414,7 +413,6 @@ private int getSafeEntry(long value) {
}
private BitSet createNewBitSet() {
- return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize);
+ return this.threadSafe ? new ConcurrentRoaringBitSet() : new RoaringBitSet();
}
-
-}
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
new file mode 100644
index 0000000000000..814e58400993b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.BitSet;
+import java.util.concurrent.locks.StampedLock;
+import java.util.stream.IntStream;
+import org.roaringbitmap.RoaringBitSet;
+
+public class ConcurrentRoaringBitSet extends RoaringBitSet {
+ private final StampedLock rwLock = new StampedLock();
+
+ public ConcurrentRoaringBitSet() {
+ super();
+ }
+
+ @Override
+ public boolean get(int bitIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ boolean isSet = super.get(bitIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ isSet = super.get(bitIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return isSet;
+ }
+
+ @Override
+ public void set(int bitIndex) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.set(bitIndex);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void clear(int bitIndex) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.clear(bitIndex);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void set(int fromIndex, int toIndex) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.set(fromIndex, toIndex);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void clear(int fromIndex, int toIndex) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.clear(fromIndex, toIndex);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void clear() {
+ long stamp = rwLock.writeLock();
+ try {
+ super.clear();
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public int nextSetBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int nextSetBit = super.nextSetBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ nextSetBit = super.nextSetBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return nextSetBit;
+ }
+
+ @Override
+ public int nextClearBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int nextClearBit = super.nextClearBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ nextClearBit = super.nextClearBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return nextClearBit;
+ }
+
+ @Override
+ public int previousSetBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int previousSetBit = super.previousSetBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ previousSetBit = super.previousSetBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return previousSetBit;
+ }
+
+ @Override
+ public int previousClearBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int previousClearBit = super.previousClearBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ previousClearBit = super.previousClearBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return previousClearBit;
+ }
+
+ @Override
+ public int length() {
+ long stamp = rwLock.tryOptimisticRead();
+ int length = super.length();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ length = super.length();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return length;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ long stamp = rwLock.tryOptimisticRead();
+ boolean isEmpty = super.isEmpty();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ isEmpty = super.isEmpty();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return isEmpty;
+ }
+
+ @Override
+ public int cardinality() {
+ long stamp = rwLock.tryOptimisticRead();
+ int cardinality = super.cardinality();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ cardinality = super.cardinality();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return cardinality;
+ }
+
+ @Override
+ public int size() {
+ long stamp = rwLock.tryOptimisticRead();
+ int size = super.size();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ size = super.size();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return size;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ long stamp = rwLock.tryOptimisticRead();
+ byte[] byteArray = super.toByteArray();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ byteArray = super.toByteArray();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return byteArray;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long stamp = rwLock.tryOptimisticRead();
+ long[] longArray = super.toLongArray();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ longArray = super.toLongArray();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return longArray;
+ }
+
+ @Override
+ public void flip(int bitIndex) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.flip(bitIndex);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void flip(int fromIndex, int toIndex) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.flip(fromIndex, toIndex);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void set(int bitIndex, boolean value) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.set(bitIndex, value);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void set(int fromIndex, int toIndex, boolean value) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.set(fromIndex, toIndex, value);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public BitSet get(int fromIndex, int toIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ BitSet bitSet = super.get(fromIndex, toIndex);
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ bitSet = super.get(fromIndex, toIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return bitSet;
+ }
+
+ @Override
+ public boolean intersects(BitSet set) {
+ long stamp = rwLock.writeLock();
+ try {
+ return super.intersects(set);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void and(BitSet set) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.and(set);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void or(BitSet set) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.or(set);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void xor(BitSet set) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.xor(set);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void andNot(BitSet set) {
+ long stamp = rwLock.writeLock();
+ try {
+ super.andNot(set);
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
+ }
+
+ /**
+ * Returns the clone of the internal wrapped {@code BitSet}.
+ * This won't be a clone of the {@code ConcurrentBitSet} object.
+ *
+ * @return a clone of the internal wrapped {@code BitSet}
+ */
+ @Override
+ public Object clone() {
+ long stamp = rwLock.tryOptimisticRead();
+ RoaringBitSet clone = (RoaringBitSet) super.clone();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ clone = (RoaringBitSet) super.clone();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return clone;
+ }
+
+ @Override
+ public String toString() {
+ long stamp = rwLock.tryOptimisticRead();
+ String str = super.toString();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ str = super.toString();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return str;
+ }
+
+ /**
+ * This operation is not supported on {@code ConcurrentBitSet}.
+ */
+ @Override
+ public IntStream stream() {
+ throw new UnsupportedOperationException("stream is not supported");
+ }
+
+ public boolean equals(final Object o) {
+ long stamp = rwLock.tryOptimisticRead();
+ boolean isEqual = super.equals(o);
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ isEqual = super.equals(o);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return isEqual;
+ }
+
+ public int hashCode() {
+ long stamp = rwLock.tryOptimisticRead();
+ int hashCode = super.hashCode();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ hashCode = super.hashCode();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return hashCode;
+ }
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index afe193eeb7e9d..f56df6ae9d103 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -300,6 +300,7 @@ message FeatureFlags {
optional bool supports_broker_entry_metadata = 2 [default = false];
optional bool supports_partial_producer = 3 [default = false];
optional bool supports_topic_watchers = 4 [default = false];
+ optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false];
}
message CommandConnected {
@@ -413,6 +414,7 @@ message CommandPartitionedTopicMetadata {
// to the proxy.
optional string original_auth_data = 4;
optional string original_auth_method = 5;
+ optional bool metadata_auto_creation_enabled = 6 [default = true];
}
message CommandPartitionedTopicMetadataResponse {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 835045f9167dd..485bea3f1addb 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -267,6 +267,12 @@ public void testFromPersistenceNamingEncoding() {
} catch (IllegalArgumentException e) {
// Exception is expected.
}
+
+ // case5: local name with special characters e.g. a:b:c
+ String topicName = "persistent://tenant/namespace/a:b:c";
+ String persistentNamingEncoding = "tenant/namespace/persistent/a%3Ab%3Ac";
+ assertEquals(TopicName.get(topicName).getPersistenceNamingEncoding(), persistentNamingEncoding);
+ assertEquals(TopicName.fromPersistenceNamingEncoding(persistentNamingEncoding), topicName);
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d03f57e97205c..075e8bc9a764c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -40,14 +41,11 @@
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -55,7 +53,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
@@ -77,6 +75,7 @@
import org.apache.pulsar.functions.source.PulsarFunctionRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
@@ -88,6 +87,8 @@
*/
@ToString(exclude = {"pulsarAdmin"})
class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
+ private final ProducerBuilderFactory producerBuilderFactory;
+ private final Map producerProperties;
private InstanceConfig config;
private Logger logger;
@@ -99,7 +100,6 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final PulsarAdmin pulsarAdmin;
private Map> publishProducers;
private ThreadLocal