Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds (19 Dec) #353

Merged
merged 10 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ The Apache Software License, Version 2.0
* AirCompressor
- io.airlift-aircompressor-0.27.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.1.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
- org.asynchttpclient-async-http-client-2.12.4.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
* Jetty
- org.eclipse.jetty-jetty-client-9.4.56.v20240826.jar
- org.eclipse.jetty-jetty-continuation-9.4.56.v20240826.jar
Expand Down Expand Up @@ -533,7 +533,7 @@ Protocol Buffers License

CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- com.sun.activation-javax.activation-1.2.0.jar
- com.sun.activation-jakarta.activation-1.2.2.jar
* Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar
* WebSocket Server API -- javax.websocket-javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
Expand Down
6 changes: 3 additions & 3 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ The Apache Software License, Version 2.0
* AirCompressor
- aircompressor-0.27.jar
* AsyncHttpClient
- async-http-client-2.12.1.jar
- async-http-client-netty-utils-2.12.1.jar
- async-http-client-2.12.4.jar
- async-http-client-netty-utils-2.12.4.jar
* Jetty
- jetty-client-9.4.56.v20240826.jar
- jetty-http-9.4.56.v20240826.jar
Expand Down Expand Up @@ -423,7 +423,7 @@ MIT License

CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- javax.activation-1.2.0.jar
- jakarta.activation-1.2.2.jar
* WebSocket Server API -- javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
- hk2-api-2.6.1.jar
Expand Down
11 changes: 2 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ flexible messaging model and an intuitive client API.</description>
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
<confluent.version>6.2.8</confluent.version>
<aircompressor.version>0.27</aircompressor.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>
<asynchttpclient.version>2.12.4</asynchttpclient.version>
<jcommander.version>1.82</jcommander.version>
<commons-lang3.version>3.11</commons-lang3.version>
<commons-configuration.version>1.10</commons-configuration.version>
Expand All @@ -228,10 +228,9 @@ flexible messaging model and an intuitive client API.</description>
<hppc.version>0.9.1</hppc.version>
<spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version>
<assertj-core.version>3.24.2</assertj-core.version>
<lombok.version>1.18.32</lombok.version>
<lombok.version>1.18.36</lombok.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
<jaxb-api>2.3.1</jaxb-api>
<javax.activation.version>1.2.0</javax.activation.version>
<jakarta.activation.version>1.2.2</jakarta.activation.version>
<jakarta.xml.bind.version>2.3.3</jakarta.xml.bind.version>
<jakarta.validation.version>2.0.2</jakarta.validation.version>
Expand Down Expand Up @@ -1336,12 +1335,6 @@ flexible messaging model and an intuitive client API.</description>
<version>${jakarta.xml.bind.version}</version>
</dependency>

<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>javax.activation</artifactId>
<version>${javax.activation.version}</version>
</dependency>

<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void initialize(ServiceConfiguration config) throws IOException {
.build();
}
AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder()
.setCookieStore(null)
.setConnectTimeout(connectionTimeout)
.setReadTimeout(readTimeout)
.setSslContext(sslContext)
Expand Down
2 changes: 1 addition & 1 deletion pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@

<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>javax.activation</artifactId>
<artifactId>jakarta.activation</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1988,9 +1988,13 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
final CompletableFuture<Void> future = new CompletableFuture<>();

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
String replicationStartAt = getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
final InitialPosition initialPosition;
if (MessageId.earliest.toString()
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
// "MessageId.earliest.toString()" is "-1:-1:-1", which is not suggested, just guarantee compatibility with the
// previous version.
// "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
|| InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
Expand Down Expand Up @@ -3379,7 +3383,8 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded(boolean shouldUpdateO
});
}

private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
@VisibleForTesting
EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
PositionImpl markDeletePosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
Expand All @@ -3399,7 +3404,8 @@ private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaC

// if the mark-delete position is the last entry it means all entries for
// that ledger are acknowledged
if (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1) {
if (markDeletePositionLedgerInfo != null
&& (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1)) {
PositionImpl positionToCheck = managedLedger.getNextValidPosition(markDeletePosition);
positionToCheckLedgerInfo = ledger.getLedgerInfo(positionToCheck.getLedgerId()).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,9 +993,9 @@ public void testConfigReplicationStartAt() throws Exception {
disableReplication(topic1);

// 2.Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
admin1.brokers().updateDynamicConfiguration("replicationStartAt", "earliest");
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
assertEquals(pulsar1.getConfiguration().getReplicationStartAt(), "earliest");
});

final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import static org.testng.Assert.assertEquals;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

protected void doInitConf() throws Exception {
this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
this.conf.setManagedLedgerMaxEntriesPerLedger(2);
this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
}

/***
* Background: the steps for checking backlog metadata are as follows.
* - Get the oldest cursor.
* - Return the result if the oldest `cursor.md` equals LAC.
* - Else, calculate the estimated backlog quota.
*
* What case been covered by this test.
* - The method `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` may get an NPE when the
* `@param position(cursor.markDeletedPositon)` equals LAC and the latest ledger has been removed by a
* `ML.trimLedgers`, which was introduced by https://github.com/apache/pulsar/pull/21816.
* - Q: The broker checked whether the oldest `cursor.md` equals LAC at step 2 above, why does it still call
* `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` with a param that equals `LAC`?
* - A: There may be some `acknowledgments` and `ML.trimLedgers` that happened between `step2 above and step 3`.
*/
@Test
public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws Exception {
final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
admin.topics().createNonPartitionedTopic(tp);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(tp, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
Consumer c1 = pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("s1");

// Generated multi ledgers.
Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create();
byte[] content = new byte[]{1};
for (int i = 0; i < 10; i++) {
p1.send(content);
}

// Consume all messages.
// Trim ledgers, then the LAC relates to a ledger who has been deleted.
admin.topics().skipAllMessages(tp, "s1");
Awaitility.await().untilAsserted(() -> {
assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry());
});
CompletableFuture completableFuture = new CompletableFuture();
ml.trimConsumedLedgersInBackground(completableFuture);
completableFuture.join();
Awaitility.await().untilAsserted(() -> {
assertEquals(ml.getLedgersInfo().size(), 1);
assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry());
});

// Verify: "persistentTopic.estimatedTimeBasedBacklogQuotaCheck" will not get a NullPointerException.
PositionImpl oldestPosition = ml.getCursors().getCursorWithOldestPosition().getPosition();
persistentTopic.estimatedTimeBasedBacklogQuotaCheck(oldestPosition);

p1.close();
c1.close();
admin.topics().delete(tp, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ public void onThrowable(Throwable t) {

private AsyncHttpClient getHttpClient(String version) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setUserAgent(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,12 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
}

interface ThrowingBiConsumer<T, U> {
void accept(T t, U u) throws Exception;
}

@Test
public void testSeekChunkMessages() throws PulsarClientException {
public void testSeekChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
Expand Down Expand Up @@ -594,14 +598,17 @@ public void testSeekChunkMessages() throws PulsarClientException {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.startMessageIdInclusive()
.startMessageId(msgIds.get(1))
.create();

Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
assertEquals(msgIds.get(1), readMsg.getMessageId());
ThrowingBiConsumer<Boolean, MessageId> assertStartMessageId = (inclusive, expectedFirstMsgId) -> {
final var builder = pulsarClient.newReader().topic(topicName).startMessageId(msgIds.get(1));
if (inclusive) {
builder.startMessageIdInclusive();
}
@Cleanup final var reader = builder.create();
final var readMsg = reader.readNext(5, TimeUnit.SECONDS);
assertEquals(expectedFirstMsgId, readMsg.getMessageId());
};
assertStartMessageId.accept(true, msgIds.get(1));
assertStartMessageId.accept(false, msgIds.get(2));

consumer1.close();
consumer2.close();
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<include>com.google.guava:guava</include>
<include>com.spotify:completable-futures</include>
<include>com.squareup.*:*</include>
<include>com.sun.activation:javax.activation</include>
<include>com.sun.activation:jakarta.activation</include>
<include>com.typesafe.netty:netty-reactive-streams</include>
<include>com.yahoo.datasketches:*</include>
<include>com.yahoo.datasketches:sketches-core</include>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
</dependency>
<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>javax.activation</artifactId>
<artifactId>jakarta.activation</artifactId>
<scope>runtime</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co
if (conf.getConnectionMaxIdleSeconds() > 0) {
confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 1000);
}
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(false);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
<include>com.google.j2objc:*</include>
<include>com.spotify:completable-futures</include>
<include>com.squareup.*:*</include>
<include>com.sun.activation:javax.activation</include>
<include>com.sun.activation:jakarta.activation</include>
<!-- Avro transitive dependencies -->
<include>com.thoughtworks.paranamer:paranamer</include>
<include>com.typesafe.netty:netty-reactive-streams</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,8 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
ClientBuilder tlsProtocols(Set<String> tlsProtocols);

/**
* Configure a limit on the amount of direct memory that will be allocated by this client instance.
* <p>
* <b>Note: at this moment this is only limiting the memory for producers.</b>
* Configure a limit on the amount of direct memory that will be allocated by this client instance
* <i>(default: 64 MB)</i>.
* <p>
* Setting this to 0 will disable the limit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,13 +837,16 @@ public interface ConsumerBuilder<T> extends Cloneable {

/**
* If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default,
* and will double itself until it reaches the value set by {@link #receiverQueueSize(int)}, if and only if:
* and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client
* memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
*
* <p>The consumer receiver queue size will double if and only if:
* <p>1) User calls receive() and there are no messages in receiver queue.
* <p>2) The last message we put in the receiver queue took the last space available in receiver queue.
*
* This is disabled by default and currentReceiverQueueSize is initialized as maxReceiverQueueSize.
* <p>This is disabled by default and currentReceiverQueueSize is initialized as maxReceiverQueueSize.
*
* The feature should be able to reduce client memory usage.
* <p>The feature should be able to reduce client memory usage.
*
* @param enabled whether to enable AutoScaledReceiverQueueSize.
*/
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
<include>com.google.guava:*</include>
<include>com.google.j2objc:*</include>
<include>com.spotify:completable-futures</include>
<include>com.sun.activation:javax.activation</include>
<include>com.sun.activation:jakarta.activation</include>
<!-- Avro transitive dependencies -->
<include>com.thoughtworks.paranamer:paranamer</include>
<include>com.typesafe.netty:netty-reactive-streams</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,13 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
this.startMessageId = (firstChunkMessageId == null) ? (MessageIdAdv) startMessageId : firstChunkMessageId;
if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
// The chunk message id's ledger id and entry id are the last chunk's ledger id and entry id, when
// startMessageIdInclusive() is enabled, we need to start from the first chunk's message id
this.startMessageId = firstChunkMessageId;
} else {
this.startMessageId = (MessageIdAdv) startMessageId;
}
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
Expand Down
Loading
Loading