Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #349

Merged
merged 11 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ alias echo='{ [[ $- =~ .*x.* ]] && trace_enabled=1 || trace_enabled=0; set +x; }
# Test Groups -- start --
function test_group_broker_group_1() {
mvn_test -pl pulsar-broker -Dgroups='broker' -DtestReuseFork=true
# run tests in broker-isolated group individually (instead of with -Dgroups=broker-isolated) to avoid scanning all test classes
mvn_test -pl pulsar-broker -Dtest=org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorWithNoUnsafeTest -DtestForkCount=1 -DtestReuseFork=false
}

function test_group_broker_group_2() {
Expand Down
4 changes: 0 additions & 4 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,6 @@ MIT License
* The Checker Framework
- checker-qual-3.33.0.jar

Protocol Buffers License
* Protocol Buffers
- protobuf-java-3.25.5.jar -- ../licenses/LICENSE-protobuf.txt

CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- javax.annotation-api-1.3.2.jar
Expand Down
4 changes: 4 additions & 0 deletions jetcd-core-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
<include>io.etcd:*</include>
<include>io.vertx:*</include>
</includes>
<excludes>
<!-- This is required for execute shade multiple times without clean -->
<exclude>org.apache.pulsar:jetcd-core-shaded</exclude>
</excludes>
</artifactSet>
<relocations>
<!-- relocate vertx packages since they will be transformed to use grpc-netty-shaded packages -->
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,7 @@ flexible messaging model and an intuitive client API.</description>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${testHeapDumpPath} -XX:+ExitOnOutOfMemoryError -Xmx2G -XX:+UseZGC
<argLine>${testJacocoAgentArgument} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${testHeapDumpPath} -XX:+ExitOnOutOfMemoryError -Xmx3G -XX:+UseZGC
-Dpulsar.allocator.pooled=true
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ public String getSubscription() {
return subscription;
}

@Override
public boolean hasDataFromHttp() {
return authData.hasDataFromHttp();
}

@Override
public String getHttpAuthType() {
return authData.getHttpAuthType();
}

@Override
public String getHttpHeader(String name) {
return authData.getHttpHeader(name);
}

public AuthenticationDataSource getAuthData() {
return authData;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.AssertJUnit.assertEquals;
import javax.servlet.http.HttpServletRequest;
import org.testng.annotations.Test;

public class AuthenticationDataSubscriptionTest {

AuthenticationDataSubscription target;

@Test
public void testTargetFromAuthenticationDataHttp(){
var req = mock(HttpServletRequest.class);
String headerName = "Authorization";
String headerValue = "my-header";
String authType = "my-authType";
doReturn(headerValue).when(req).getHeader(eq(headerName));
doReturn("localhost").when(req).getRemoteAddr();
doReturn(4000).when(req).getRemotePort();
doReturn(authType).when(req).getAuthType();
AuthenticationDataSource authenticationDataSource = new AuthenticationDataHttp(req);
target = new AuthenticationDataSubscription(authenticationDataSource, "my-sub");
assertEquals(headerValue, target.getHttpHeader(headerName));
assertEquals(authType, target.getHttpAuthType());
assertEquals(true, target.hasDataFromHttp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,9 @@ public void start() throws PulsarServerException {
this.webSocketService.setLocalCluster(clusterData);
}

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

// Start the leader election service
startLeaderElectionService();

Expand All @@ -865,9 +868,6 @@ public void start() throws PulsarServerException {
// (namespace service depends on load manager)
this.startLoadManagementService();

// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
this.nsService.initialize();

// Start topic level policies service
if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) {
this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
}

LookupData lookupData = lookupResult.get().getLookupData();
printWarnLogIfLookupResUnexpected(topicName, lookupData, options, pulsarService);
if (lookupResult.get().isRedirect()) {
boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect();
lookupfuture.complete(
Expand All @@ -338,6 +339,24 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
return lookupfuture;
}

/**
* Check if a internal client will get a null lookup result.
*/
private static void printWarnLogIfLookupResUnexpected(TopicName topic, LookupData lookupData, LookupOptions options,
PulsarService pulsar) {
if (!pulsar.getBrokerService().isSystemTopic(topic)) {
return;
}
boolean tlsEnabled = pulsar.getConfig().isBrokerClientTlsEnabled();
if (!tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrl())) {
log.warn("[{}] Unexpected lookup result: brokerUrl is required when TLS isn't enabled. options: {},"
+ " result {}", topic, options, lookupData);
} else if (tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrlTls())) {
log.warn("[{}] Unexpected lookup result: brokerUrlTls is required when TLS is enabled. options: {},"
+ " result {}", topic, options, lookupData);
}
}

private static void handleLookupError(CompletableFuture<ByteBuf> lookupFuture, String topicName, String clientAppId,
long requestId, Throwable ex){
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class OwnershipCache {
/**
* The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled.
*/
private final NamespaceEphemeralData selfOwnerInfoDisabled;
private NamespaceEphemeralData selfOwnerInfoDisabled;

private final LockManager<NamespaceEphemeralData> lockManager;

Expand Down Expand Up @@ -121,6 +121,9 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
this.pulsar = pulsar;
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
// At this moment, the variables "webServiceAddress" and "webServiceAddressTls" and so on have not been
// initialized, so we will get an empty "selfOwnerInfo" and an empty "selfOwnerInfoDisabled" here.
// But do not worry, these two fields will be set by the method "refreshSelfOwnerInfo" soon.
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
false, pulsar.getAdvertisedListeners());
Expand Down Expand Up @@ -353,6 +356,9 @@ public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.Collector;
import java.io.BufferedOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -363,19 +364,24 @@ protected ByteBuf generateMetrics(List<PrometheusRawMetricsProvider> metricsProv
}
}

private ByteBuf allocateMultipartCompositeDirectBuffer() {
ByteBuf allocateMultipartCompositeDirectBuffer() {
// use composite buffer with pre-allocated buffers to ensure that the pooled allocator can be used
// for allocating the buffers
ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT;
int chunkSize = resolveChunkSize(byteBufAllocator);
CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer(
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
int chunkSize = resolveChunkSize(byteBufAllocator);
buf = byteBufAllocator.compositeDirectBuffer(
Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize / chunkSize) + 1));
int totalLen = 0;
while (totalLen < initialBufferSize) {
totalLen += chunkSize;
// increase the capacity in increments of chunkSize to preallocate the buffers
// in the composite buffer
buf.capacity(totalLen);
int totalLen = 0;
while (totalLen < initialBufferSize) {
totalLen += chunkSize;
// increase the capacity in increments of chunkSize to preallocate the buffers
// in the composite buffer
buf.capacity(totalLen);
}
} else {
buf = byteBufAllocator.directBuffer(initialBufferSize);
}
return buf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
Expand Down Expand Up @@ -1943,5 +1944,49 @@ public void close() {
}
}
}

@Test
public void testTlsWithAuthParams() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
Authentication auth;

Set<String> providers = new HashSet<>();
providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");

conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

String authParam = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTlsFileForClient("admin.cert"),
getTlsFileForClient("admin.key-pk8"));
String authClassName = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(brokerUrlTls.toString());
conf.setAuthParams(authParam);
conf.setAuthPluginClassName(authClassName);
conf.setTlsAllowInsecureConnection(true);

PulsarClient pulsarClient = null;
try {
pulsarClient = (new ClientBuilderImpl(conf)).build();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;

import static org.testng.Assert.assertFalse;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import java.time.Clock;
import lombok.Cleanup;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker-isolated")
public class PrometheusMetricsGeneratorWithNoUnsafeTest {

@BeforeClass
static void setup() {
System.setProperty("io.netty.noUnsafe", "true");
}

@Test
public void testWriteStringWithNoUnsafe() {
assertFalse(PlatformDependent.hasUnsafe());
@Cleanup
PrometheusMetricsGenerator generator = new PrometheusMetricsGenerator(null, false, false, false, false,
Clock.systemUTC());
@Cleanup("release")
ByteBuf buf = generator.allocateMultipartCompositeDirectBuffer();
for (int i = 0; i < 2; i++) {
buf.writeBytes(new byte[1024 * 1024]);
}
SimpleTextOutputStream outputStream = new SimpleTextOutputStream(buf);
outputStream.write("test");
}
}
2 changes: 1 addition & 1 deletion pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</parent>

<artifactId>pulsar-client-original</artifactId>
<name>Pulsar Client Java</name>
<name>Pulsar Client Java Original</name>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;

Expand All @@ -60,6 +61,9 @@ public PulsarClient build() throws PulsarClientException {
"Cannot get service url from service url provider.");
conf.setServiceUrl(conf.getServiceUrlProvider().getServiceUrl());
}
if (conf.getAuthentication() == null || conf.getAuthentication() == AuthenticationDisabled.INSTANCE) {
setAuthenticationFromPropsIfAvailable(conf);
}
PulsarClient client = new PulsarClientImpl(conf);
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(client);
Expand Down
5 changes: 0 additions & 5 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,6 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
Expand Down
Loading
Loading