Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] DO NOT MERGE #288

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
64ead41
[improve][ci] Migrate from Gradle Enterprise to Develocity (#22880)
lhotari Jun 8, 2024
65576f9
[fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
merlimat Jun 5, 2024
4e98aa9
[fix][cli] Fix Pulsar standalone shutdown - bkCluster wasn't closed (…
lhotari Jun 7, 2024
40ef1ae
[fix][cli] Fix Pulsar standalone "--wipe-data" (#22885)
lhotari Jun 10, 2024
b984e6d
[improve] Upgrade IPAddress to 5.5.0 (#22886)
merlimat Jun 10, 2024
5c834f2
[fix][misc] Topic name from persistence name should decode local name…
Shawyeok Jun 11, 2024
0337bf6
[improve][broker] Optimize PersistentTopic.getLastDispatchablePositio…
dao-jun Jun 11, 2024
f17a6b2
[improve][broker] Include runtime dependencies in server distribution…
dragosvictor Jan 31, 2024
fe11f9a
[fix][broker][branch-3.0] The topic might reference a closed ledger (…
shibd Jun 13, 2024
e2c4f32
[fix][cli] Fix the pulsar-daemon parameter passthrough syntax (#22905)
coderzc Jun 14, 2024
6091cec
[improve][broker] Include runtime dependencies in server distribution…
dragosvictor Jan 31, 2024
a7bcb02
[improve][misc] Include native epoll library for Netty for arm64 (#22…
lhotari Mar 21, 2024
28fe36c
[fix][broker] Fix topic status for oldestBacklogMessageAgeSeconds con…
shibd Jun 14, 2024
568be99
fix: cannot find symbol from cherry-pick 73b50e
shibd Jun 18, 2024
4fab68d
[fix] [client] PIP-344 Do not create partitioned metadata when callin…
poorbarcode May 23, 2024
4c1f8e2
[improve] [client] PIP-344 support feature flag supportsGetPartitione…
poorbarcode Jun 6, 2024
63e2dcc
[fix] [broker] response not-found error if topic does not exist when …
poorbarcode Jun 17, 2024
134621c
[fix][client] fix producer/consumer perform lookup for migrated topic…
rdhabalia Oct 14, 2023
5775898
[fix][test] Fix thread leaks in Managed Ledger tests and remove dupli…
lhotari Oct 24, 2023
d733598
[fix] [broker] Messages lost on the remote cluster when using topic l…
poorbarcode Jun 19, 2024
3dd8e63
[fix][fn] Enable optimized Netty direct byte buffer support for Pulsa…
lhotari Jun 17, 2024
71e6d02
[fix] [proxy] Add missing parameter in newPartitionMetadataRequest call
nikhil-ctds Jun 24, 2024
ab293a2
[fix][test] Fix TableViewBuilderImplTest NPE and infinite loop (#22924)
lhotari Jun 17, 2024
0d08b5b
[fix] [client] Fix resource leak in Pulsar Client since HttpLookupSer…
poorbarcode Jun 18, 2024
00a6eb0
[fix] [broker] broker log a full thread dump when a deadlock is detec…
yyj8 Jun 20, 2024
befa38d
[fix][fn] Support compression type and crypto config for all producer…
lhotari Jun 20, 2024
c373025
[fix][broker] Check the markDeletePosition and calculate the backlog …
nodece Jun 20, 2024
47caa79
[improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by Roarin…
dao-jun Jun 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,5 @@ test-reports/

# Gradle Enterprise
.mvn/.gradle-enterprise/
# Gradle Develocity
.mvn/.develocity/
13 changes: 6 additions & 7 deletions .mvn/gradle-enterprise.xml → .mvn/develocity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@
under the License.

-->
<gradleEnterprise
xmlns="https://www.gradle.com/gradle-enterprise-maven" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.gradle.com/gradle-enterprise-maven https://www.gradle.com/schema/gradle-enterprise-maven.xsd">
<develocity xmlns="https://www.gradle.com/develocity-maven" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.gradle.com/develocity-maven https://www.gradle.com/schema/develocity-maven.xsd">
<!-- Enable Gradle Develocity extension when GRADLE_ENTERPRISE_ACCESS_KEY/DEVELOCITY_ACCESS_KEY is set and the build isn't
a pull request from a branch or forked repository with a name that indicates it's a work in progress. -->
<enabled>#{(env['GRADLE_ENTERPRISE_ACCESS_KEY']?.trim() > '' or env['DEVELOCITY_ACCESS_KEY']?.trim() > '') and !(env['GITHUB_HEAD_REF']?.matches('(?i).*(experiment|wip|private).*') or env['GITHUB_REPOSITORY']?.matches('(?i).*(experiment|wip|private).*'))}</enabled>
<server>
<url>https://ge.apache.org</url>
<allowUntrusted>false</allowUntrusted>
</server>
<buildScan>
<capture>
<goalInputFiles>true</goalInputFiles>
<buildLogging>true</buildLogging>
<testLogging>true</testLogging>
</capture>
<backgroundBuildScanUpload>#{isFalse(env['GITHUB_ACTIONS'])}</backgroundBuildScanUpload>
<publish>ALWAYS</publish>
<publishIfAuthenticated>true</publishIfAuthenticated>
<obfuscation>
<ipAddresses>#{{'0.0.0.0'}}</ipAddresses>
</obfuscation>
Expand All @@ -47,4 +46,4 @@
<enabled>false</enabled>
</remote>
</buildCache>
</gradleEnterprise>
</develocity>
6 changes: 3 additions & 3 deletions .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
xsi:schemaLocation="http://maven.apache.org/EXTENSIONS/1.0.0 http://maven.apache.org/xsd/core-extensions-1.0.0.xsd">
<extension>
<groupId>com.gradle</groupId>
<artifactId>gradle-enterprise-maven-extension</artifactId>
<version>1.17.1</version>
<artifactId>develocity-maven-extension</artifactId>
<version>1.21.4</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
<artifactId>common-custom-user-data-maven-extension</artifactId>
<version>1.11.1</version>
<version>2.0</version>
</extension>
</extensions>
19 changes: 9 additions & 10 deletions bin/pulsar-daemon
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ start ()
echo starting $command, logging to $logfile
echo Note: Set immediateFlush to true in conf/log4j2.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
pulsar=$PULSAR_HOME/bin/pulsar
nohup $pulsar $command "$1" > "$out" 2>&1 < /dev/null &
nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null &
echo $! > $pid
sleep 1; head $out
sleep 2;
Expand Down Expand Up @@ -216,29 +216,28 @@ stop ()

case $startStop in
(start)
start "$*"
start "$@"
;;

(stop)
stop $1
;;

(restart)
forceStopFlag=$(echo "$*"|grep "\-force")
if [[ "$forceStopFlag" != "" ]]
if [[ "$1" == "-force" ]]
then
stop "-force"
stop -force
# remove "-force" from the arguments
shift
else
stop
fi
if [ "$?" == 0 ]
then
sleep 3
paramaters="$*"
startParamaters=${paramaters//-force/}
start "$startParamaters"
sleep 3
start "$@"
else
echo "WARNNING : $command failed restart, for $command is not stopped completely."
echo "WARNNING : $command failed restart, for $command is not stopped completely."
fi
;;

Expand Down
6 changes: 3 additions & 3 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ The Apache Software License, Version 2.0
- io.netty-netty-resolver-dns-native-macos-4.1.108.Final-osx-x86_64.jar
- io.netty-netty-transport-4.1.108.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.108.Final.jar
- io.netty-netty-transport-native-epoll-4.1.108.Final-linux-aarch_64.jar
- io.netty-netty-transport-native-epoll-4.1.108.Final-linux-x86_64.jar
- io.netty-netty-transport-native-unix-common-4.1.108.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.108.Final-linux-x86_64.jar
Expand Down Expand Up @@ -500,11 +501,11 @@ The Apache Software License, Version 2.0
- io.etcd-jetcd-core-0.7.7.jar
- io.etcd-jetcd-grpc-0.7.7.jar
* IPAddress
- com.github.seancfoley-ipaddress-5.3.3.jar
- com.github.seancfoley-ipaddress-5.5.0.jar
* RxJava
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.44.jar
- org.roaringbitmap-RoaringBitmap-1.1.0.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down Expand Up @@ -537,7 +538,6 @@ Protocol Buffers License

CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- javax.annotation-javax.annotation-api-1.3.2.jar
- com.sun.activation-javax.activation-1.2.0.jar
- javax.xml.bind-jaxb-api-2.3.1.jar
* Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar
Expand Down
10 changes: 6 additions & 4 deletions distribution/server/src/assemble/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,23 @@
<dependencySet>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
<scope>compile</scope>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<!-- Include 'groupId' in the dependencies Jar names to better identify
the provenance of the jar -->
<outputFileNameMapping>${artifact.groupId}-${artifact.artifactId}-${artifact.version}${dashClassifier?}.${artifact.extension}</outputFileNameMapping>

<excludes>
<exclude>com.datastax.oss:pulsar-functions-runtime-all</exclude>

<exclude>org.projectlombok:lombok</exclude>

<!-- prevent adding pulsar-functions-api-examples in lib -->
<exclude>com.datastax.oss:pulsar-functions-api-examples</exclude>
<!-- prevent adding any distribution .tar.gz files in lib -->
<exclude>*:tar.gz</exclude>
<!-- prevent adding annotation libraries -->
<exclude>org.codehaus.mojo:animal-sniffer-annotations</exclude>
<exclude>com.google.android:annotations</exclude>
<!-- Needed only in the pulsar-shell distro only -->
<exclude>net.java.dev.jna:jna</exclude>
</excludes>
</dependencySet>
</dependencySets>
Expand Down
3 changes: 3 additions & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ The Apache Software License, Version 2.0
- netty-resolver-dns-4.1.108.Final.jar
- netty-transport-4.1.108.Final.jar
- netty-transport-classes-epoll-4.1.108.Final.jar
- netty-transport-native-epoll-4.1.108.Final-linux-aarch_64.jar
- netty-transport-native-epoll-4.1.108.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.108.Final.jar
- netty-transport-native-unix-common-4.1.108.Final-linux-x86_64.jar
Expand All @@ -379,6 +380,8 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_common-0.16.0.jar
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
* RoaringBitmap
- RoaringBitmap-1.1.0.jar
* Log4J
- log4j-api-2.18.0.jar
- log4j-core-2.18.0.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,13 @@ public long getEstimatedSizeSinceMarkDeletePosition() {
return ledger.estimateBacklogFromPosition(markDeletePosition);
}

private long getNumberOfEntriesInBacklog() {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
}
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}

@Override
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
Expand All @@ -1120,16 +1127,13 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
}
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
return getNumberOfEntriesInBacklog();
}

long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
backlog = getNumberOfEntriesInBacklog();
}

return backlog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -535,13 +536,12 @@ public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {
int numLedgers = ledgerNames.size();
log.info("Closing {} ledgers", numLedgers);
for (String ledgerName : ledgerNames) {
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
CompletableFuture<ManagedLedgerImpl> ledgerFuture = ledgers.remove(ledgerName);
if (ledgerFuture == null) {
future.complete(null);
continue;
}
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
ledgerFuture.whenCompleteAsync((managedLedger, throwable) -> {
if (throwable != null || managedLedger == null) {
future.complete(null);
Expand Down Expand Up @@ -606,68 +606,20 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}
}));
entryCacheManager.clear();
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdown();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
}

@Override
public void shutdown() throws InterruptedException, ManagedLedgerException {
if (closed) {
throw new ManagedLedgerException.ManagedLedgerFactoryClosedException();
try {
shutdownAsync().get();
} catch (ExecutionException e) {
throw getManagedLedgerException(e.getCause());
}
closed = true;

statsTask.cancel(true);
flushCursorsTask.cancel(true);
cacheEvictionExecutor.shutdownNow();

// take a snapshot of ledgers currently in the map to prevent race conditions
List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
int numLedgers = ledgers.size();
final CountDownLatch latch = new CountDownLatch(numLedgers);
log.info("Closing {} ledgers", numLedgers);

for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
if (ledger == null) {
latch.countDown();
continue;
}

ledger.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
latch.countDown();
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Got exception when closing managed ledger: {}", ledger.getName(), exception);
latch.countDown();
}
}, null);
}

latch.await();
log.info("{} ledgers closed", numLedgers);

if (isBookkeeperManaged) {
try {
BookKeeper bookkeeper = bookkeeperFactory.get();
if (bookkeeper != null) {
bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);
}
}

scheduledExecutor.shutdownNow();

entryCacheManager.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At
bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
} else {
deleteLedgerInfo.hasCalled = true;
new Thread(() -> {
cachedExecutor.submit(() -> {
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
bkc.asyncDeleteLedger(ledgerId, cb, ctx);
}).start();
});
}
return null;
}).when(spyBookKeeper).asyncDeleteLedger(any(long.class), any(AsyncCallback.DeleteCallback.class), any());
Expand All @@ -220,6 +220,7 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At
public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
BookKeeper spyBookKeeper = spy(bkc);
@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);

Expand Down Expand Up @@ -3938,6 +3939,7 @@ public void testCancellationOfScheduledTasks() throws Exception {
public void testInactiveLedgerRollOver() throws Exception {
int inactiveLedgerRollOverTimeMs = 5;
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -3969,7 +3971,6 @@ public void testInactiveLedgerRollOver() throws Exception {
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
assertEquals(ledgers.size(), totalAddEntries);
ledger.close();
factory.shutdown();
}

@Test
Expand Down Expand Up @@ -4022,6 +4023,7 @@ public void testDontRollOverInactiveLedgersWhenMetadataServiceInvalid() throws E

@Test
public void testOffloadTaskCancelled() throws Exception {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ protected void startZKCluster() throws Exception {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder()
.metadataStoreName("metastore-" + getClass().getSimpleName())
.build()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public MockedBookKeeperTestCase(int numBookies) {
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build()));
MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().metadataStoreName("metastore-" + method.getName()).build()));

try {
// start bookkeeper service
Expand Down Expand Up @@ -102,7 +104,11 @@ public final void tearDown(Method method) {
}
try {
LOG.info("@@@@@@@@@ stopping " + method);
factory.shutdownAsync().get(10, TimeUnit.SECONDS);
try {
factory.shutdownAsync().get(10, TimeUnit.SECONDS);
} catch (ManagedLedgerException.ManagedLedgerFactoryClosedException e) {
// ignore
}
factory = null;
stopBookKeeper();
metadataStore.close();
Expand Down
Loading
Loading