diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index f40ec20e40a9e..911a6271d6ab3 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -379,8 +379,8 @@ The Apache Software License, Version 2.0
* AirCompressor
- io.airlift-aircompressor-0.27.jar
* AsyncHttpClient
- - org.asynchttpclient-async-http-client-2.12.1.jar
- - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
+ - org.asynchttpclient-async-http-client-2.12.4.jar
+ - org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
* Jetty
- org.eclipse.jetty-jetty-client-9.4.56.v20240826.jar
- org.eclipse.jetty-jetty-continuation-9.4.56.v20240826.jar
@@ -533,7 +533,7 @@ Protocol Buffers License
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- - com.sun.activation-javax.activation-1.2.0.jar
+ - com.sun.activation-jakarta.activation-1.2.2.jar
* Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar
* WebSocket Server API -- javax.websocket-javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index 35188c6848643..7138530d9ae35 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -392,8 +392,8 @@ The Apache Software License, Version 2.0
* AirCompressor
- aircompressor-0.27.jar
* AsyncHttpClient
- - async-http-client-2.12.1.jar
- - async-http-client-netty-utils-2.12.1.jar
+ - async-http-client-2.12.4.jar
+ - async-http-client-netty-utils-2.12.4.jar
* Jetty
- jetty-client-9.4.56.v20240826.jar
- jetty-http-9.4.56.v20240826.jar
@@ -423,7 +423,7 @@ MIT License
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- - javax.activation-1.2.0.jar
+ - jakarta.activation-1.2.2.jar
* WebSocket Server API -- javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
- hk2-api-2.6.1.jar
diff --git a/pom.xml b/pom.xml
index 76d90aaa07699..41a12044c2208 100644
--- a/pom.xml
+++ b/pom.xml
@@ -212,7 +212,7 @@ flexible messaging model and an intuitive client API.
0.16.16.2.80.27
- 2.12.1
+ 2.12.41.823.111.10
@@ -228,10 +228,9 @@ flexible messaging model and an intuitive client API.
0.9.12.1.03.24.2
- 1.18.32
+ 1.18.361.3.52.3.1
- 1.2.01.2.22.3.32.0.2
@@ -1336,12 +1335,6 @@ flexible messaging model and an intuitive client API.
${jakarta.xml.bind.version}
-
- com.sun.activation
- javax.activation
- ${javax.activation.version}
-
-
com.sun.activationjakarta.activation
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
index 426e575004c98..07f1054c4e35d 100644
--- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
@@ -173,6 +173,7 @@ public void initialize(ServiceConfiguration config) throws IOException {
.build();
}
AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder()
+ .setCookieStore(null)
.setConnectTimeout(connectionTimeout)
.setReadTimeout(readTimeout)
.setSslContext(sslContext)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index dc87ea7803580..913c5c3d613f5 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -428,7 +428,7 @@
com.sun.activation
- javax.activation
+ jakarta.activation
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 60f45dd2e1261..ca65f7f0ca3bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1988,9 +1988,13 @@ CompletableFuture startReplicator(String remoteCluster) {
final CompletableFuture future = new CompletableFuture<>();
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
+ String replicationStartAt = getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
final InitialPosition initialPosition;
- if (MessageId.earliest.toString()
- .equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
+ // "MessageId.earliest.toString()" is "-1:-1:-1", which is not suggested, just guarantee compatibility with the
+ // previous version.
+ // "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
+ if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
+ || InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
@@ -3379,7 +3383,8 @@ public CompletableFuture checkTimeBacklogExceeded(boolean shouldUpdateO
});
}
- private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
+ @VisibleForTesting
+ EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
PositionImpl markDeletePosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
@@ -3399,7 +3404,8 @@ private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaC
// if the mark-delete position is the last entry it means all entries for
// that ledger are acknowledged
- if (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1) {
+ if (markDeletePositionLedgerInfo != null
+ && (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1)) {
PositionImpl positionToCheck = managedLedger.getNextValidPosition(markDeletePosition);
positionToCheckLedgerInfo = ledger.getLedgerInfo(positionToCheck.getLedgerId()).get();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 79b856b1da07b..e1a626900948c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -993,9 +993,9 @@ public void testConfigReplicationStartAt() throws Exception {
disableReplication(topic1);
// 2.Update config: start at "earliest".
- admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
+ admin1.brokers().updateDynamicConfiguration("replicationStartAt", "earliest");
Awaitility.await().untilAsserted(() -> {
- pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
+ assertEquals(pulsar1.getConfiguration().getReplicationStartAt(), "earliest");
});
final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
new file mode 100644
index 0000000000000..b582eb94d1264
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ protected void doInitConf() throws Exception {
+ this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
+ this.conf.setManagedLedgerMaxEntriesPerLedger(2);
+ this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
+ this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ }
+
+ /***
+ * Background: the steps for checking backlog metadata are as follows.
+ * - Get the oldest cursor.
+ * - Return the result if the oldest `cursor.md` equals LAC.
+ * - Else, calculate the estimated backlog quota.
+ *
+ * What case been covered by this test.
+ * - The method `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` may get an NPE when the
+ * `@param position(cursor.markDeletedPositon)` equals LAC and the latest ledger has been removed by a
+ * `ML.trimLedgers`, which was introduced by https://github.com/apache/pulsar/pull/21816.
+ * - Q: The broker checked whether the oldest `cursor.md` equals LAC at step 2 above, why does it still call
+ * `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` with a param that equals `LAC`?
+ * - A: There may be some `acknowledgments` and `ML.trimLedgers` that happened between `step2 above and step 3`.
+ */
+ @Test
+ public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws Exception {
+ final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
+ admin.topics().createNonPartitionedTopic(tp);
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(tp, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ Consumer c1 = pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("s1");
+
+ // Generated multi ledgers.
+ Producer p1 = pulsarClient.newProducer().topic(tp).create();
+ byte[] content = new byte[]{1};
+ for (int i = 0; i < 10; i++) {
+ p1.send(content);
+ }
+
+ // Consume all messages.
+ // Trim ledgers, then the LAC relates to a ledger who has been deleted.
+ admin.topics().skipAllMessages(tp, "s1");
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
+ assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry());
+ });
+ CompletableFuture completableFuture = new CompletableFuture();
+ ml.trimConsumedLedgersInBackground(completableFuture);
+ completableFuture.join();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.getLedgersInfo().size(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
+ assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry());
+ });
+
+ // Verify: "persistentTopic.estimatedTimeBasedBacklogQuotaCheck" will not get a NullPointerException.
+ PositionImpl oldestPosition = ml.getCursors().getCursorWithOldestPosition().getPosition();
+ persistentTopic.estimatedTimeBasedBacklogQuotaCheck(oldestPosition);
+
+ p1.close();
+ c1.close();
+ admin.topics().delete(tp, false);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 9319ea4e876b7..f132aef96bd2a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -1061,6 +1061,7 @@ public void onThrowable(Throwable t) {
private AsyncHttpClient getHttpClient(String version) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setUserAgent(version);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index f266afd8a2ee1..47e4f0ed944e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -543,8 +543,12 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
}
+ interface ThrowingBiConsumer {
+ void accept(T t, U u) throws Exception;
+ }
+
@Test
- public void testSeekChunkMessages() throws PulsarClientException {
+ public void testSeekChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
@@ -594,14 +598,17 @@ public void testSeekChunkMessages() throws PulsarClientException {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}
- Reader reader = pulsarClient.newReader()
- .topic(topicName)
- .startMessageIdInclusive()
- .startMessageId(msgIds.get(1))
- .create();
-
- Message readMsg = reader.readNext(5, TimeUnit.SECONDS);
- assertEquals(msgIds.get(1), readMsg.getMessageId());
+ ThrowingBiConsumer assertStartMessageId = (inclusive, expectedFirstMsgId) -> {
+ final var builder = pulsarClient.newReader().topic(topicName).startMessageId(msgIds.get(1));
+ if (inclusive) {
+ builder.startMessageIdInclusive();
+ }
+ @Cleanup final var reader = builder.create();
+ final var readMsg = reader.readNext(5, TimeUnit.SECONDS);
+ assertEquals(expectedFirstMsgId, readMsg.getMessageId());
+ };
+ assertStartMessageId.accept(true, msgIds.get(1));
+ assertStartMessageId.accept(false, msgIds.get(2));
consumer1.close();
consumer2.close();
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index 4a910d0d86f55..6fadddc60b126 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -111,7 +111,7 @@
com.google.guava:guavacom.spotify:completable-futurescom.squareup.*:*
- com.sun.activation:javax.activation
+ com.sun.activation:jakarta.activationcom.typesafe.netty:netty-reactive-streamscom.yahoo.datasketches:*com.yahoo.datasketches:sketches-core
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index a6655a212f09a..7287d51124fa3 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -88,7 +88,7 @@
com.sun.activation
- javax.activation
+ jakarta.activationruntime
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index ceb7ce424195e..60e1b43442428 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -157,6 +157,7 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co
if (conf.getConnectionMaxIdleSeconds() > 0) {
confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 1000);
}
+ confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(false);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index af80260633907..e7d12a71ff11e 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -152,7 +152,7 @@
com.google.j2objc:*com.spotify:completable-futurescom.squareup.*:*
- com.sun.activation:javax.activation
+ com.sun.activation:jakarta.activationcom.thoughtworks.paranamer:paranamercom.typesafe.netty:netty-reactive-streams
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 7bf1a03d3f455..7b70017a84358 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -437,9 +437,8 @@ ClientBuilder authentication(String authPluginClassName, Map aut
ClientBuilder tlsProtocols(Set tlsProtocols);
/**
- * Configure a limit on the amount of direct memory that will be allocated by this client instance.
- *
- * Note: at this moment this is only limiting the memory for producers.
+ * Configure a limit on the amount of direct memory that will be allocated by this client instance
+ * (default: 64 MB).
*
* Setting this to 0 will disable the limit.
*
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index e2a166572e60a..5318dde28dbd2 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -837,13 +837,16 @@ public interface ConsumerBuilder extends Cloneable {
/**
* If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default,
- * and will double itself until it reaches the value set by {@link #receiverQueueSize(int)}, if and only if:
+ * and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client
+ * memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
+ *
+ *
The consumer receiver queue size will double if and only if:
*
1) User calls receive() and there are no messages in receiver queue.
*
2) The last message we put in the receiver queue took the last space available in receiver queue.
*
- * This is disabled by default and currentReceiverQueueSize is initialized as maxReceiverQueueSize.
+ *
This is disabled by default and currentReceiverQueueSize is initialized as maxReceiverQueueSize.
*
- * The feature should be able to reduce client memory usage.
+ *
The feature should be able to reduce client memory usage.
*
* @param enabled whether to enable AutoScaledReceiverQueueSize.
*/
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 40ae7c72873d1..39062bc1ee60e 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -126,7 +126,7 @@
com.google.guava:*com.google.j2objc:*com.spotify:completable-futures
- com.sun.activation:javax.activation
+ com.sun.activation:jakarta.activationcom.thoughtworks.paranamer:paranamercom.typesafe.netty:netty-reactive-streams
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fdc966f4f2e02..551dd669a06f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -283,7 +283,13 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
- this.startMessageId = (firstChunkMessageId == null) ? (MessageIdAdv) startMessageId : firstChunkMessageId;
+ if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
+ // The chunk message id's ledger id and entry id are the last chunk's ledger id and entry id, when
+ // startMessageIdInclusive() is enabled, we need to start from the first chunk's message id
+ this.startMessageId = firstChunkMessageId;
+ } else {
+ this.startMessageId = (MessageIdAdv) startMessageId;
+ }
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 080d328e3f02c..1674f7469affc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -85,6 +85,7 @@ private ControlledClusterFailover(ControlledClusterFailoverBuilderImpl builder)
private AsyncHttpClient buildHttpClient() {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setMaxRedirects(DEFAULT_MAX_REDIRECTS);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 38b8954377957..ea45fe8981e34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -73,6 +73,7 @@ protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup
this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 768f6d31d18a2..da596ce0985e3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -53,6 +53,7 @@ public TokenClient(URL tokenUrl) {
TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
if (httpClient == null) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 20b71eb189bd9..aadb9d15af6e3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -170,9 +170,9 @@ private TopicName(String completeTopicName) {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
}
-
- if (localName == null || localName.isEmpty()) {
- throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
+ if (StringUtils.isBlank(localName)) {
+ throw new IllegalArgumentException(String.format("Invalid topic name: %s. Topic local name must not"
+ + " be blank.", completeTopicName));
}
} catch (NullPointerException e) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 485bea3f1addb..27eb82d15af0d 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -177,6 +177,13 @@ public void topic() {
// Ok
}
+ try {
+ TopicName.get(" ");
+ fail("Should have raised exception");
+ } catch (IllegalArgumentException e) {
+ // Ok
+ }
+
TopicName nameWithSlash = TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1");
assertEquals(nameWithSlash.getEncodedLocalName(), Codec.encode("ns-abc/table/1"));
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
index 7af238154d634..5856600196b49 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
@@ -27,13 +27,11 @@
*/
@Data
public class JavaExecutionResult {
- private Exception userException;
- private Exception systemException;
+ private Throwable userException;
private Object result;
public void reset() {
setUserException(null);
- setSystemException(null);
setResult(null);
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 292f52b5091b9..5946be9fe5be9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -25,11 +25,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
@@ -57,13 +59,26 @@ public static class AsyncFuncRequest {
private final ExecutorService executor;
@Getter
private final LinkedBlockingQueue pendingAsyncRequests;
+ @Getter
+ private final Semaphore asyncRequestsConcurrencyLimiter;
+ private final boolean asyncPreserveInputOrderForOutputMessages;
public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {
this.context = contextImpl;
this.instanceConfig = instanceConfig;
this.executor = Executors.newSingleThreadExecutor();
- this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
+
+ asyncPreserveInputOrderForOutputMessages =
+ resolveAsyncPreserveInputOrderForOutputMessages(instanceConfig);
+
+ if (asyncPreserveInputOrderForOutputMessages) {
+ this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
+ this.asyncRequestsConcurrencyLimiter = null;
+ } else {
+ this.pendingAsyncRequests = null;
+ this.asyncRequestsConcurrencyLimiter = new Semaphore(this.instanceConfig.getMaxPendingAsyncRequests());
+ }
// create the functions
if (userClassObject instanceof Function) {
@@ -73,6 +88,20 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceCon
}
}
+ // resolve whether to preserve input order for output messages for async functions
+ private boolean resolveAsyncPreserveInputOrderForOutputMessages(InstanceConfig instanceConfig) {
+ // no need to preserve input order for output messages if the function returns Void type
+ boolean voidReturnType = instanceConfig.getFunctionDetails() != null
+ && instanceConfig.getFunctionDetails().getSink() != null
+ && Void.class.getName().equals(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
+ if (voidReturnType) {
+ return false;
+ }
+
+ // preserve input order for output messages
+ return true;
+ }
+
@VisibleForTesting
public JavaExecutionResult handleMessage(Record> record, Object input) {
return handleMessage(record, input, (rec, result) -> {
@@ -103,15 +132,33 @@ public JavaExecutionResult handleMessage(Record> record, Object input,
}
if (output instanceof CompletableFuture) {
- // Function is in format: Function>
- AsyncFuncRequest request = new AsyncFuncRequest(
- record, (CompletableFuture) output
- );
try {
- pendingAsyncRequests.put(request);
- ((CompletableFuture) output).whenCompleteAsync((res, cause) -> {
+ if (asyncPreserveInputOrderForOutputMessages) {
+ // Function is in format: Function>
+ AsyncFuncRequest request = new AsyncFuncRequest(
+ record, (CompletableFuture) output
+ );
+ pendingAsyncRequests.put(request);
+ } else {
+ asyncRequestsConcurrencyLimiter.acquire();
+ }
+ ((CompletableFuture