diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index b6275903be6f0..3c85e6aca2b1a 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -77,6 +77,8 @@ alias echo='{ [[ $- =~ .*x.* ]] && trace_enabled=1 || trace_enabled=0; set +x; }
# Test Groups -- start --
function test_group_broker_group_1() {
mvn_test -pl pulsar-broker -Dgroups='broker' -DtestReuseFork=true
+ # run tests in broker-isolated group individually (instead of with -Dgroups=broker-isolated) to avoid scanning all test classes
+ mvn_test -pl pulsar-broker -Dtest=org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorWithNoUnsafeTest -DtestForkCount=1 -DtestReuseFork=false
}
function test_group_broker_group_2() {
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index f8670dc069d2a..746b67a6014dc 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -421,10 +421,6 @@ MIT License
* The Checker Framework
- checker-qual-3.33.0.jar
-Protocol Buffers License
- * Protocol Buffers
- - protobuf-java-3.25.5.jar -- ../licenses/LICENSE-protobuf.txt
-
CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- javax.annotation-api-1.3.2.jar
diff --git a/jetcd-core-shaded/pom.xml b/jetcd-core-shaded/pom.xml
index bdcfae9bc1641..ed24730b4dff5 100644
--- a/jetcd-core-shaded/pom.xml
+++ b/jetcd-core-shaded/pom.xml
@@ -94,6 +94,10 @@
io.etcd:*
io.vertx:*
+
+
+ org.apache.pulsar:jetcd-core-shaded
+
diff --git a/pom.xml b/pom.xml
index 74b931699f92b..78cc658d14661 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1669,7 +1669,7 @@ flexible messaging model and an intuitive client API.
org.apache.maven.plugins
maven-surefire-plugin
- ${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${testHeapDumpPath} -XX:+ExitOnOutOfMemoryError -Xmx2G -XX:+UseZGC
+ ${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${testHeapDumpPath} -XX:+ExitOnOutOfMemoryError -Xmx3G -XX:+UseZGC
-Dpulsar.allocator.pooled=true
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
index 9a7324a6d077a..96b85989b6fc3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
@@ -72,6 +72,21 @@ public String getSubscription() {
return subscription;
}
+ @Override
+ public boolean hasDataFromHttp() {
+ return authData.hasDataFromHttp();
+ }
+
+ @Override
+ public String getHttpAuthType() {
+ return authData.getHttpAuthType();
+ }
+
+ @Override
+ public String getHttpHeader(String name) {
+ return authData.getHttpHeader(name);
+ }
+
public AuthenticationDataSource getAuthData() {
return authData;
}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscriptionTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscriptionTest.java
new file mode 100644
index 0000000000000..8af7b87fd221b
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscriptionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import javax.servlet.http.HttpServletRequest;
+import org.testng.annotations.Test;
+
+public class AuthenticationDataSubscriptionTest {
+
+ AuthenticationDataSubscription target;
+
+ @Test
+ public void testTargetFromAuthenticationDataHttp(){
+ var req = mock(HttpServletRequest.class);
+ String headerName = "Authorization";
+ String headerValue = "my-header";
+ String authType = "my-authType";
+ doReturn(headerValue).when(req).getHeader(eq(headerName));
+ doReturn("localhost").when(req).getRemoteAddr();
+ doReturn(4000).when(req).getRemotePort();
+ doReturn(authType).when(req).getAuthType();
+ AuthenticationDataSource authenticationDataSource = new AuthenticationDataHttp(req);
+ target = new AuthenticationDataSubscription(authenticationDataSource, "my-sub");
+ assertEquals(headerValue, target.getHttpHeader(headerName));
+ assertEquals(authType, target.getHttpAuthType());
+ assertEquals(true, target.hasDataFromHttp());
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 136b5b636f099..8252868266d1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -854,6 +854,9 @@ public void start() throws PulsarServerException {
this.webSocketService.setLocalCluster(clusterData);
}
+ // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
+ this.nsService.initialize();
+
// Start the leader election service
startLeaderElectionService();
@@ -865,9 +868,6 @@ public void start() throws PulsarServerException {
// (namespace service depends on load manager)
this.startLoadManagementService();
- // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
- this.nsService.initialize();
-
// Start topic level policies service
if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) {
this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 9a05c3d992aaf..adc65db380d36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -314,6 +314,7 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe
}
LookupData lookupData = lookupResult.get().getLookupData();
+ printWarnLogIfLookupResUnexpected(topicName, lookupData, options, pulsarService);
if (lookupResult.get().isRedirect()) {
boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect();
lookupfuture.complete(
@@ -338,6 +339,24 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe
return lookupfuture;
}
+ /**
+ * Check if a internal client will get a null lookup result.
+ */
+ private static void printWarnLogIfLookupResUnexpected(TopicName topic, LookupData lookupData, LookupOptions options,
+ PulsarService pulsar) {
+ if (!pulsar.getBrokerService().isSystemTopic(topic)) {
+ return;
+ }
+ boolean tlsEnabled = pulsar.getConfig().isBrokerClientTlsEnabled();
+ if (!tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrl())) {
+ log.warn("[{}] Unexpected lookup result: brokerUrl is required when TLS isn't enabled. options: {},"
+ + " result {}", topic, options, lookupData);
+ } else if (tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrlTls())) {
+ log.warn("[{}] Unexpected lookup result: brokerUrlTls is required when TLS is enabled. options: {},"
+ + " result {}", topic, options, lookupData);
+ }
+ }
+
private static void handleLookupError(CompletableFuture lookupFuture, String topicName, String clientAppId,
long requestId, Throwable ex){
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 5833531f8de9d..3f0483f58adc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -73,7 +73,7 @@ public class OwnershipCache {
/**
* The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled.
*/
- private final NamespaceEphemeralData selfOwnerInfoDisabled;
+ private NamespaceEphemeralData selfOwnerInfoDisabled;
private final LockManager lockManager;
@@ -121,6 +121,9 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
this.pulsar = pulsar;
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
+ // At this moment, the variables "webServiceAddress" and "webServiceAddressTls" and so on have not been
+ // initialized, so we will get an empty "selfOwnerInfo" and an empty "selfOwnerInfoDisabled" here.
+ // But do not worry, these two fields will be set by the method "refreshSelfOwnerInfo" soon.
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
false, pulsar.getAdvertisedListeners());
@@ -353,6 +356,9 @@ public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
+ this.selfOwnerInfoDisabled = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
+ pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
+ pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5ca02e5f93dfd..e2ff03252d52a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -25,6 +25,7 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.Collector;
import java.io.BufferedOutputStream;
import java.io.IOException;
@@ -363,19 +364,24 @@ protected ByteBuf generateMetrics(List metricsProv
}
}
- private ByteBuf allocateMultipartCompositeDirectBuffer() {
+ ByteBuf allocateMultipartCompositeDirectBuffer() {
// use composite buffer with pre-allocated buffers to ensure that the pooled allocator can be used
// for allocating the buffers
ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT;
- int chunkSize = resolveChunkSize(byteBufAllocator);
- CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer(
+ ByteBuf buf;
+ if (PlatformDependent.hasUnsafe()) {
+ int chunkSize = resolveChunkSize(byteBufAllocator);
+ buf = byteBufAllocator.compositeDirectBuffer(
Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize / chunkSize) + 1));
- int totalLen = 0;
- while (totalLen < initialBufferSize) {
- totalLen += chunkSize;
- // increase the capacity in increments of chunkSize to preallocate the buffers
- // in the composite buffer
- buf.capacity(totalLen);
+ int totalLen = 0;
+ while (totalLen < initialBufferSize) {
+ totalLen += chunkSize;
+ // increase the capacity in increments of chunkSize to preallocate the buffers
+ // in the composite buffer
+ buf.capacity(totalLen);
+ }
+ } else {
+ buf = byteBufAllocator.directBuffer(initialBufferSize);
}
return buf;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index c200cce09bf32..0e1fe3aa535d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -103,6 +103,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
@@ -1943,5 +1944,49 @@ public void close() {
}
}
}
+
+ @Test
+ public void testTlsWithAuthParams() throws Exception {
+ final String topicName = "persistent://prop/ns-abc/newTopic";
+ final String subName = "newSub";
+ Authentication auth;
+
+ Set providers = new HashSet<>();
+ providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
+
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthenticationProviders(providers);
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+ conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+ conf.setTlsAllowInsecureConnection(false);
+ conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+ conf.setNumExecutorThreadPoolSize(5);
+ restartBroker();
+
+ String authParam = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTlsFileForClient("admin.cert"),
+ getTlsFileForClient("admin.key-pk8"));
+ String authClassName = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl(brokerUrlTls.toString());
+ conf.setAuthParams(authParam);
+ conf.setAuthPluginClassName(authClassName);
+ conf.setTlsAllowInsecureConnection(true);
+
+ PulsarClient pulsarClient = null;
+ try {
+ pulsarClient = (new ClientBuilderImpl(conf)).build();
+
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscribe();
+ } catch (Exception e) {
+ fail("should not fail");
+ } finally {
+ pulsarClient.close();
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorWithNoUnsafeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorWithNoUnsafeTest.java
new file mode 100644
index 0000000000000..006428b4815f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorWithNoUnsafeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.testng.Assert.assertFalse;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import java.time.Clock;
+import lombok.Cleanup;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-isolated")
+public class PrometheusMetricsGeneratorWithNoUnsafeTest {
+
+ @BeforeClass
+ static void setup() {
+ System.setProperty("io.netty.noUnsafe", "true");
+ }
+
+ @Test
+ public void testWriteStringWithNoUnsafe() {
+ assertFalse(PlatformDependent.hasUnsafe());
+ @Cleanup
+ PrometheusMetricsGenerator generator = new PrometheusMetricsGenerator(null, false, false, false, false,
+ Clock.systemUTC());
+ @Cleanup("release")
+ ByteBuf buf = generator.allocateMultipartCompositeDirectBuffer();
+ for (int i = 0; i < 2; i++) {
+ buf.writeBytes(new byte[1024 * 1024]);
+ }
+ SimpleTextOutputStream outputStream = new SimpleTextOutputStream(buf);
+ outputStream.write("test");
+ }
+}
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index e8944cd0a4960..6ed57b946380e 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -30,7 +30,7 @@
pulsar-client-original
- Pulsar Client Java
+ Pulsar Client Java Original
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 7677045f0899b..f49b704445fbd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -60,6 +61,9 @@ public PulsarClient build() throws PulsarClientException {
"Cannot get service url from service url provider.");
conf.setServiceUrl(conf.getServiceUrlProvider().getServiceUrl());
}
+ if (conf.getAuthentication() == null || conf.getAuthentication() == AuthenticationDisabled.INSTANCE) {
+ setAuthenticationFromPropsIfAvailable(conf);
+ }
PulsarClient client = new PulsarClientImpl(conf);
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(client);
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 260ef2b44db09..76d65ce6dc11f 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -201,11 +201,6 @@
true
-
- com.google.protobuf
- protobuf-java
-
-
com.spotify
completable-futures
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
index 171f5acd2bf75..6b6b0492b4525 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
@@ -30,11 +30,14 @@
import java.security.Principal;
import java.security.PrivateKey;
import java.security.UnrecoverableKeyException;
+import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedKeyManager;
@@ -88,18 +91,16 @@ private void updateKeyManager()
return;
}
- X509Certificate certificate;
- PrivateKey privateKey = null;
- KeyStore keyStore;
- try (InputStream publicCertStream = new FileInputStream(certFile.getFileName());
- InputStream privateKeyStream = new FileInputStream(keyFile.getFileName())) {
+ final KeyStore keyStore;
+ try (InputStream publicCertStream = new FileInputStream(certFile.getFileName())) {
final CertificateFactory cf = CertificateFactory.getInstance("X.509");
- certificate = (X509Certificate) cf.generateCertificate(publicCertStream);
+ final List certificateList = cf.generateCertificates(publicCertStream)
+ .stream().map(o -> (X509Certificate) o).collect(Collectors.toList());
keyStore = KeyStore.getInstance("JKS");
- String alias = certificate.getSubjectX500Principal().getName();
- privateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
+ final String alias = certificateList.get(0).getSubjectX500Principal().getName();
+ final PrivateKey privateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
keyStore.load(null);
- keyStore.setKeyEntry(alias, privateKey, KEYSTORE_PASSWORD, new X509Certificate[] { certificate });
+ keyStore.setKeyEntry(alias, privateKey, KEYSTORE_PASSWORD, certificateList.toArray(new Certificate[0]));
} catch (IOException | KeyManagementException e) {
throw new IllegalArgumentException(e);
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/KeyManagerProxyTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/KeyManagerProxyTest.java
new file mode 100644
index 0000000000000..5542f0b22ac95
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/KeyManagerProxyTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import com.google.common.io.Resources;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Cleanup;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class KeyManagerProxyTest {
+
+ @DataProvider(name = "certDataProvider")
+ public static Object[][] caDataProvider() {
+ return new Object[][]{
+ {"ca/multiple-ca.pem", 2},
+ {"ca/single-ca.pem", 1}
+ };
+ }
+
+ @Test(dataProvider = "certDataProvider")
+ public void testLoadCert(String path, int certCount) {
+ final String certFilePath = Resources.getResource(path).getPath();
+ // This key is not paired with certs, but this is not a problem as the key is not used in this test
+ final String keyFilePath = Resources.getResource("ssl/my-ca/client-key.pem").getPath();
+ @Cleanup("shutdownNow")
+ final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ final KeyManagerProxy keyManager = new KeyManagerProxy(certFilePath, keyFilePath, 60, scheduledExecutor);
+ assertEquals(keyManager.getCertificateChain("cn=test1").length, certCount);
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 8d7743dd791f1..dcd5bfde008f7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -177,6 +177,14 @@ public CompletableFuture handleMetadataEvent(MetadataEvent event) {
return result;
}
+ /**
+ * @deprecated Use {@link #registerSyncListener(Optional)} instead.
+ */
+ @Deprecated
+ protected void registerSyncLister(Optional synchronizer) {
+ this.registerSyncListener(synchronizer);
+ }
+
protected void registerSyncListener(Optional synchronizer) {
synchronizer.ifPresent(s -> s.registerSyncListener(this::handleMetadataEvent));
}
diff --git a/src/set-project-version.sh b/src/set-project-version.sh
index cf67e37682ff1..cd3343871b3d5 100755
--- a/src/set-project-version.sh
+++ b/src/set-project-version.sh
@@ -36,8 +36,8 @@ pushd ${ROOT_DIR}
# Get the current version
OLD_VERSION=`python3 ${ROOT_DIR}/src/get-project-version.py`
-mvn versions:set -DnewVersion=$NEW_VERSION
-mvn versions:set -DnewVersion=$NEW_VERSION -pl buildtools
+mvn versions:set -DnewVersion=$NEW_VERSION -DgenerateBackupPoms=false
+mvn versions:set -DnewVersion=$NEW_VERSION -DgenerateBackupPoms=false -pl buildtools
# Set terraform ansible deployment pulsar version
sed -i -e "s/${OLD_VERSION}/${NEW_VERSION}/g" ${TERRAFORM_DIR}/deploy-pulsar.yaml
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
index 4504b58ca920b..9ed65e4888ad8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
@@ -49,7 +49,7 @@ protected Map getEnv() {
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "filesystem");
- result.put("fileSystemURI", "file:///");
+ result.put("fileSystemURI", "file:///pulsar/data");
return result;
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
index bd1d2a49f7ba4..e058d8d277ba3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
@@ -96,7 +96,7 @@ protected Map getEnv() {
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "filesystem");
- result.put("fileSystemURI", "file:///");
+ result.put("fileSystemURI", "file:///pulsar/data");
return result;
}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 25b63374946c8..18d7eccff3521 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -105,7 +105,7 @@ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu
this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
this.driverName = conf.getManagedLedgerOffloadDriver();
- this.storageBasePath = configuration.get("hadoop.tmp.dir");
+ this.storageBasePath = configuration.get("fs.defaultFS");
this.scheduler = scheduler;
this.fileSystem = FileSystem.get(configuration);
this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
new file mode 100644
index 0000000000000..14734b3faca99
--- /dev/null
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.testng.annotations.Test;
+
+public class FileSystemOffloaderLocalFileTest {
+ private OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+ private LedgerOffloaderStats offloaderStats = LedgerOffloaderStats.create(true, true, scheduler, 60);
+
+
+ private String getResourceFilePath(String name) {
+ return getClass().getClassLoader().getResource(name).getPath();
+ }
+
+ @Test
+ public void testReadWriteWithLocalFileUsingFileSystemURI() throws Exception {
+ // prepare the offload policies
+ final String basePath = "/tmp";
+ OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+ offloadPolicies.setFileSystemURI("file://" + basePath);
+ offloadPolicies.setManagedLedgerOffloadDriver("filesystem");
+ offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml"));
+
+ // initialize the offloader with the offload policies
+ var offloader = FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats);
+
+ int numberOfEntries = 100;
+
+ // prepare the data in bookkeeper
+ BookKeeper bk = new PulsarMockBookKeeper(scheduler);
+ LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "".getBytes());
+ for (int i = 0; i < numberOfEntries; i++) {
+ byte[] entry = ("foobar"+i).getBytes();
+ lh.addEntry(entry);
+ }
+ lh.close();
+
+ ReadHandle read = bk.newOpenLedgerOp()
+ .withLedgerId(lh.getId())
+ .withDigestType(DigestType.CRC32)
+ .withPassword("".getBytes()).execute().get();
+
+ final String mlName = TopicName.get("testWriteLocalFIle").getPersistenceNamingEncoding();
+ Map offloadDriverMetadata = new HashMap<>();
+ offloadDriverMetadata.put("ManagedLedgerName", mlName);
+
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(read, uuid, offloadDriverMetadata).get();
+ ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid, offloadDriverMetadata).get();
+ assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed());
+ LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
+ LedgerEntries toWriteEntries = read.read(0,numberOfEntries - 1);
+ Iterator toTestIter = toTestEntries.iterator();
+ Iterator toWriteIter = toWriteEntries.iterator();
+ while(toTestIter.hasNext()) {
+ LedgerEntry toWriteEntry = toWriteIter.next();
+ LedgerEntry toTestEntry = toTestIter.next();
+
+ assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+ assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+ assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+ assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+ }
+ toTestEntries = toTest.read(1, numberOfEntries - 1);
+ toWriteEntries = read.read(1,numberOfEntries - 1);
+ toTestIter = toTestEntries.iterator();
+ toWriteIter = toWriteEntries.iterator();
+ while(toTestIter.hasNext()) {
+ LedgerEntry toWriteEntry = toWriteIter.next();
+ LedgerEntry toTestEntry = toTestIter.next();
+
+ assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+ assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+ assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+ assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+ }
+
+ // check the file located in the local file system
+ Path offloadedFilePath = Paths.get(basePath, mlName);
+ assertEquals(Files.exists(offloadedFilePath), true);
+ }
+}
diff --git a/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml
new file mode 100644
index 0000000000000..d26cec2cc60f0
--- /dev/null
+++ b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml
@@ -0,0 +1,48 @@
+
+
+
+
+ fs.defaultFS
+
+
+
+ hadoop.tmp.dir
+ pulsar
+
+
+ io.file.buffer.size
+ 4096
+
+
+ io.seqfile.compress.blocksize
+ 1000000
+
+
+ io.seqfile.compression.type
+ BLOCK
+
+
+ io.map.index.interval
+ 128
+
+
+