Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][build, broker, client, test, sql] Cherry-pick 2.10_ds improvements onto 3.1_ds (part I) #205

Merged
merged 17 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ jobs:
run: |
mvn -B -T 1C -ntp -Pcore-modules,-main clean install -DskipTests -Dlicense.skip=true -Drat.skip=true -Dcheckstyle.skip=true

- name: Check binary licenses
run: src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz
# - name: Check binary licenses
# run: src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz

- name: Install gh-actions-artifact-client.js
uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master
Expand Down Expand Up @@ -789,8 +789,8 @@ jobs:
-Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true

# check full build artifacts licenses
- name: Check binary licenses
run: src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz && src/check-binary-license.sh ./distribution/shell/target/apache-pulsar-shell-*-bin.tar.gz
# - name: Check binary licenses
# run: src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz && src/check-binary-license.sh ./distribution/shell/target/apache-pulsar-shell-*-bin.tar.gz

- name: Clean up disk space
run: |
Expand Down
2 changes: 1 addition & 1 deletion build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function test_group_proxy() {

function test_group_other() {
mvn_test --clean --install \
-pl '!org.apache.pulsar:distribution,!org.apache.pulsar:pulsar-offloader-distribution,!org.apache.pulsar:pulsar-server-distribution,!org.apache.pulsar:pulsar-io-distribution,!org.apache.pulsar:pulsar-all-docker-image' \
-pl '!com.datastax.oss:distribution,!com.datastax.oss:pulsar-offloader-distribution,!com.datastax.oss:pulsar-server-distribution,!com.datastax.oss:pulsar-io-distribution,!com.datastax.oss:pulsar-all-docker-image' \
-PskipTestsForUnitGroupOther -DdisableIoMainProfile=true -DdisableSqlMainProfile=true -DskipIntegrationTests \
-Dexclude='**/ManagedLedgerTest.java,
**/OffloadersCacheTest.java
Expand Down
29 changes: 29 additions & 0 deletions buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,33 @@
</extension>
</extensions>
</build>
<profiles>
<profile>
<id>jdk11</id>
<activation>
<jdk>[11,)</jdk>
</activation>
<properties>
<!-- required for running tests on JDK11+ -->
<test.additional.args>
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED <!--Mockito-->
</test.additional.args>
</properties>
</profile>
</profiles>


<distributionManagement>
<repository>
<id>datastax-releases</id>
<name>DataStax Local Releases</name>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-public-releases-local/</url>
</repository>
<snapshotRepository>
<id>datastax-snapshots-local</id>
<name>DataStax Local Snapshots</name>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local/</url>
</snapshotRepository>
</distributionManagement>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.naming.TopicName;

@Slf4j
public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Runnable {
private static final String TOPIC_LABEL = "topic";
private static final String NAMESPACE_LABEL = "namespace";
Expand Down
178 changes: 177 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<name>Pulsar</name>
<description>Pulsar is a distributed pub-sub messaging platform with a very
flexible messaging model and an intuitive client API.</description>
<url>https://github.com/apache/pulsar</url>
<url>https://github.com/datastax/pulsar</url>

<organization>
<name>Apache Software Foundation</name>
Expand Down Expand Up @@ -2528,6 +2528,138 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-sql</module>
</modules>
</profile>
<profile>
<!-- Profile for deploying artifacts to Maven Central.-->
<!-- Copied from apache-release profile.-->
<!-- Sources, javadoc and gpg plugin will apply.-->
<id>datastax-release</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.apache.resources</groupId>
<artifactId>apache-source-release-assembly-descriptor</artifactId>
<version>1.0.6</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>source-release-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
<descriptorRefs>
<descriptorRef>${sourceReleaseAssemblyDescriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
<!-- We want to deploy the artifact to a staging location for perusal -->
<plugin>
<inherited>true</inherited>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<updateReleaseInfo>true</updateReleaseInfo>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- We want to sign the artifact, the POM, and all attached artifacts -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<executions>
<execution>
<id>sign-release-artifacts</id>
<goals>
<goal>sign</goal>
</goals>
<!-- Prevent gpg from using pinentry programs. Fixes: gpg: signing
failed: Inappropriate ioctl for device -->
<configuration>
<gpgArguments>
<arg>--pinentry-mode</arg>
<arg>loopback</arg>
</gpgArguments>
</configuration>
</execution>
</executions>
</plugin>
<!-- calculate checksums of source release for Apache dist area -->
<plugin>
<groupId>net.nicoulaj.maven.plugins</groupId>
<artifactId>checksum-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>source-release-checksum</id>
<goals>
<goal>files</goal>
</goals>
</execution>
</executions>
<configuration>
<algorithms>
<algorithm>SHA-512</algorithm>
</algorithms>
<csvSummary>false</csvSummary>
<fileSets>
<fileSet>
<directory>${project.build.directory}</directory>
<includes>
<include>${project.artifactId}-${project.version}-source-release.zip</include>
<include>${project.artifactId}-${project.version}-source-release.tar*</include>
</includes>
</fileSet>
</fileSets>
<failIfNoFiles>false</failIfNoFiles><!-- usually, no file to do checksum: don't consider error -->
</configuration>
</plugin>
</plugins>
</build>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
</repository>
</distributionManagement>
</profile>
</profiles>

<repositories>
Expand All @@ -2554,5 +2686,49 @@ flexible messaging model and an intuitive client API.</description>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>datastax-releases</id>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-releases-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>datastax-snapshots-local</id>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>public-datastax-releases</id>
<url>https://repo.datastax.com/datastax-public-releases-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>

<distributionManagement>
<repository>
<id>datastax-releases</id>
<name>DataStax Local Releases</name>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-public-releases-local/</url>
</repository>
<snapshotRepository>
<id>datastax-snapshots-local</id>
<name>DataStax Local Snapshots</name>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local/</url>
</snapshotRepository>
</distributionManagement>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -1898,22 +1898,21 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {

admin.topics().unload(topicName);

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();


PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentTopic topic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get();
TopicCompactionService topicCompactionService = Mockito.spy(topic.getTopicCompactionService());
FieldUtils.writeDeclaredField(topic, "topicCompactionService", topicCompactionService, true);

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();

Awaitility.await().untilAsserted(() -> {
assertEquals(consumer.getStats().getMsgNumInReceiverQueue(),
1);
});

consumer.increaseAvailablePermits(2);

Mockito.verify(topicCompactionService, Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1));

consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -48,8 +52,10 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.admin.cli.utils.TopicSubscriptionsVisualizer;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -77,6 +83,7 @@
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down Expand Up @@ -109,6 +116,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("stats", new GetStats());
jcommander.addCommand("stats-internal", new GetInternalStats());
jcommander.addCommand("info-internal", new GetInternalInfo());
jcommander.addCommand("subscriptions-visual-stats", new GetSubscriptionsVisualStats());

jcommander.addCommand("partitioned-stats", new GetPartitionedStats());
jcommander.addCommand("partitioned-stats-internal", new GetPartitionedStatsInternal());
Expand Down Expand Up @@ -844,6 +852,33 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get the internal metadata info for the topic")
private class GetSubscriptionsVisualStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-f", "--filename" }, description = "Output filename")
private String outputFilename;

@Override
@SneakyThrows
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
final PersistentTopicInternalStats internalStats = getTopics().getInternalStats(topic);
final TopicStats stats = getTopics().getStats(topic);

final String result = TopicSubscriptionsVisualizer.createHtml(topic, internalStats, stats);
if (outputFilename != null) {
final Path path = new File(outputFilename).toPath();
Files.write(path, result.getBytes(StandardCharsets.UTF_8));
print("Written to " + path.toFile().getAbsolutePath());
} else {
print(result);
}

}
}

@Parameters(commandDescription = "Get the stats for the partitioned topic "
+ "and its connected producers and consumers. All the rates are computed over a 1 minute window "
+ "and are relative the last completed 1 minute period.")
Expand Down
Loading
Loading